{-# OPTIONS_HADDOCK not-home #-}

-- | The 'Merge' type and its functions are not intended for concurrent use.
-- Concurrent access should therefore be sequentialised using a suitable
-- concurrency primitive, such as an 'MVar'.
--
module Database.LSMTree.Internal.Merge (
    Merge (..)
  , MergeType (..)
  , IsMergeType (..)
  , LevelMergeType (..)
  , TreeMergeType (..)
  , Mappend
  , MergeState (..)
  , RunParams (..)
  , new
  , abort
  , complete
  , stepsToCompletion
  , stepsToCompletionCounted
  , StepResult (..)
  , steps
  , mergeRunParams
  ) where

import           Control.DeepSeq (NFData (..))
import           Control.Exception (assert)
import           Control.Monad (when)
import           Control.Monad.Class.MonadST (MonadST)
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow (MonadMask, MonadThrow)
import           Control.Monad.Primitive (PrimState)
import           Control.RefCount
import           Data.Primitive.MutVar
import           Data.Traversable (for)
import qualified Data.Vector as V
import           Database.LSMTree.Internal.BlobRef (RawBlobRef)
import           Database.LSMTree.Internal.Entry
import           Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import           Database.LSMTree.Internal.RunBuilder (RunBuilder, RunParams)
import qualified Database.LSMTree.Internal.RunBuilder as Builder
import qualified Database.LSMTree.Internal.RunReader as Reader
import           Database.LSMTree.Internal.RunReaders (Readers)
import qualified Database.LSMTree.Internal.RunReaders as Readers
import           Database.LSMTree.Internal.Serialise
import qualified System.FS.API as FS
import           System.FS.API (HasFS)
import           System.FS.BlockIO.API (HasBlockIO)

-- | An in-progress incremental k-way merge of 'Run's.
--
-- Since we always resolve all entries of the same key in one go, there is no
-- need to store incompletely-resolved entries.
data Merge t m h = Merge {
      forall t (m :: * -> *) h. Merge t m h -> t
mergeType        :: !t
      -- | We also store @isLastLevel mergeType@ and @isUnion mergeType@ here to
      -- avoid recomputing them it again and again for each call to 'steps',
      -- which would also add an 'IsMergeType' constraint to large parts of the
      -- interface.
    , forall t (m :: * -> *) h. Merge t m h -> Bool
mergeIsLastLevel :: !Bool
    , forall t (m :: * -> *) h. Merge t m h -> Bool
mergeIsUnion     :: !Bool
    , forall t (m :: * -> *) h. Merge t m h -> Mappend
mergeMappend     :: !Mappend
    , forall t (m :: * -> *) h. Merge t m h -> Readers m h
mergeReaders     :: {-# UNPACK #-} !(Readers m h)
    , forall t (m :: * -> *) h. Merge t m h -> RunBuilder m h
mergeBuilder     :: !(RunBuilder m h)
      -- | The result of the latest call to 'steps'. This is used to determine
      -- whether a merge can be 'complete'd.
    , forall t (m :: * -> *) h.
Merge t m h -> MutVar (PrimState m) MergeState
mergeState       :: !(MutVar (PrimState m) MergeState)
    , forall t (m :: * -> *) h. Merge t m h -> HasFS m h
mergeHasFS       :: !(HasFS m h)
    , forall t (m :: * -> *) h. Merge t m h -> HasBlockIO m h
mergeHasBlockIO  :: !(HasBlockIO m h)
    }

mergeRunParams :: Merge t m h -> RunParams
mergeRunParams :: forall t (m :: * -> *) h. Merge t m h -> RunParams
mergeRunParams = RunBuilder m h -> RunParams
forall (m :: * -> *) h. RunBuilder m h -> RunParams
Builder.runBuilderParams (RunBuilder m h -> RunParams)
-> (Merge t m h -> RunBuilder m h) -> Merge t m h -> RunParams
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Merge t m h -> RunBuilder m h
forall t (m :: * -> *) h. Merge t m h -> RunBuilder m h
mergeBuilder

-- | The current state of the merge.
data MergeState =
    -- | There is still merging work to be done
    Merging
    -- | There is no more merging work to be done, but the merge still has to be
    -- completed to yield a new run.
  | MergingDone
    -- | A run was yielded as the result of a merge. The merge is implicitly
    -- closed.
  | Completed
    -- | The merge was closed before it was completed.
  | Closed

-- | Merges can either exist on a level of the LSM, or be a union merge of two
-- tables.
class IsMergeType t where
  -- | A last level merge behaves differently from a mid-level merge: last level
  -- merges can actually remove delete operations, whereas mid-level merges must
  -- preserve them.
  isLastLevel :: t -> Bool
  -- | Union merges follow the semantics of @Data.Map.unionWith (<>)@. Since the
  -- input runs are semantically treated like @Data.Map@s, deletes are ignored
  -- and inserts act like mupserts, so they need to be merged monoidally using
  -- 'resolveValue'.
  isUnion :: t -> Bool

-- | Fully general merge type, mainly useful for testing.
data MergeType = MergeTypeMidLevel | MergeTypeLastLevel | MergeTypeUnion
  deriving stock (MergeType -> MergeType -> Bool
(MergeType -> MergeType -> Bool)
-> (MergeType -> MergeType -> Bool) -> Eq MergeType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MergeType -> MergeType -> Bool
== :: MergeType -> MergeType -> Bool
$c/= :: MergeType -> MergeType -> Bool
/= :: MergeType -> MergeType -> Bool
Eq, Int -> MergeType -> ShowS
[MergeType] -> ShowS
MergeType -> String
(Int -> MergeType -> ShowS)
-> (MergeType -> String)
-> ([MergeType] -> ShowS)
-> Show MergeType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MergeType -> ShowS
showsPrec :: Int -> MergeType -> ShowS
$cshow :: MergeType -> String
show :: MergeType -> String
$cshowList :: [MergeType] -> ShowS
showList :: [MergeType] -> ShowS
Show)

instance NFData MergeType where
  rnf :: MergeType -> ()
rnf MergeType
MergeTypeMidLevel  = ()
  rnf MergeType
MergeTypeLastLevel = ()
  rnf MergeType
MergeTypeUnion     = ()

instance IsMergeType MergeType where
  isLastLevel :: MergeType -> Bool
isLastLevel = \case
      MergeType
MergeTypeMidLevel  -> Bool
False
      MergeType
MergeTypeLastLevel -> Bool
True
      MergeType
MergeTypeUnion     -> Bool
True
  isUnion :: MergeType -> Bool
isUnion = \case
      MergeType
MergeTypeMidLevel  -> Bool
False
      MergeType
MergeTypeLastLevel -> Bool
False
      MergeType
MergeTypeUnion     -> Bool
True

-- | Different types of merges created as part of a regular (non-union) level.
--
-- A last level merge behaves differently from a mid-level merge: last level
-- merges can actually remove delete operations, whereas mid-level merges must
-- preserve them. This is orthogonal to the 'MergePolicy'.
data LevelMergeType = MergeMidLevel | MergeLastLevel
  deriving stock (LevelMergeType -> LevelMergeType -> Bool
(LevelMergeType -> LevelMergeType -> Bool)
-> (LevelMergeType -> LevelMergeType -> Bool) -> Eq LevelMergeType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: LevelMergeType -> LevelMergeType -> Bool
== :: LevelMergeType -> LevelMergeType -> Bool
$c/= :: LevelMergeType -> LevelMergeType -> Bool
/= :: LevelMergeType -> LevelMergeType -> Bool
Eq, Int -> LevelMergeType -> ShowS
[LevelMergeType] -> ShowS
LevelMergeType -> String
(Int -> LevelMergeType -> ShowS)
-> (LevelMergeType -> String)
-> ([LevelMergeType] -> ShowS)
-> Show LevelMergeType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LevelMergeType -> ShowS
showsPrec :: Int -> LevelMergeType -> ShowS
$cshow :: LevelMergeType -> String
show :: LevelMergeType -> String
$cshowList :: [LevelMergeType] -> ShowS
showList :: [LevelMergeType] -> ShowS
Show)

instance NFData LevelMergeType where
  rnf :: LevelMergeType -> ()
rnf LevelMergeType
MergeMidLevel  = ()
  rnf LevelMergeType
MergeLastLevel = ()

instance IsMergeType LevelMergeType where
  isLastLevel :: LevelMergeType -> Bool
isLastLevel = \case
      LevelMergeType
MergeMidLevel  -> Bool
False
      LevelMergeType
MergeLastLevel -> Bool
True
  isUnion :: LevelMergeType -> Bool
isUnion = Bool -> LevelMergeType -> Bool
forall a b. a -> b -> a
const Bool
False

-- | Different types of merges created as part of the merging tree.
data TreeMergeType = MergeLevel | MergeUnion
  deriving stock (TreeMergeType -> TreeMergeType -> Bool
(TreeMergeType -> TreeMergeType -> Bool)
-> (TreeMergeType -> TreeMergeType -> Bool) -> Eq TreeMergeType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TreeMergeType -> TreeMergeType -> Bool
== :: TreeMergeType -> TreeMergeType -> Bool
$c/= :: TreeMergeType -> TreeMergeType -> Bool
/= :: TreeMergeType -> TreeMergeType -> Bool
Eq, Int -> TreeMergeType -> ShowS
[TreeMergeType] -> ShowS
TreeMergeType -> String
(Int -> TreeMergeType -> ShowS)
-> (TreeMergeType -> String)
-> ([TreeMergeType] -> ShowS)
-> Show TreeMergeType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TreeMergeType -> ShowS
showsPrec :: Int -> TreeMergeType -> ShowS
$cshow :: TreeMergeType -> String
show :: TreeMergeType -> String
$cshowList :: [TreeMergeType] -> ShowS
showList :: [TreeMergeType] -> ShowS
Show)

instance NFData TreeMergeType where
  rnf :: TreeMergeType -> ()
rnf TreeMergeType
MergeLevel = ()
  rnf TreeMergeType
MergeUnion = ()

instance IsMergeType TreeMergeType where
  isLastLevel :: TreeMergeType -> Bool
isLastLevel = Bool -> TreeMergeType -> Bool
forall a b. a -> b -> a
const Bool
True
  isUnion :: TreeMergeType -> Bool
isUnion = \case
      TreeMergeType
MergeLevel -> Bool
False
      TreeMergeType
MergeUnion -> Bool
True

type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue

{-# SPECIALISE new ::
     IsMergeType t
  => HasFS IO h
  -> HasBlockIO IO h
  -> RunParams
  -> t
  -> Mappend
  -> Run.RunFsPaths
  -> V.Vector (Ref (Run IO h))
  -> IO (Maybe (Merge t IO h)) #-}
-- | Returns 'Nothing' if no input 'Run' contains any entries.
-- The list of runs should be sorted from new to old.
new ::
     (IsMergeType t, MonadMask m, MonadSTM m, MonadST m)
  => HasFS m h
  -> HasBlockIO m h
  -> RunParams
  -> t
  -> Mappend
  -> Run.RunFsPaths
  -> V.Vector (Ref (Run m h))
  -> m (Maybe (Merge t m h))
new :: forall t (m :: * -> *) h.
(IsMergeType t, MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> t
-> Mappend
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Maybe (Merge t m h))
new HasFS m h
hfs HasBlockIO m h
hbio RunParams
runParams t
mergeType Mappend
mergeMappend RunFsPaths
targetPaths Vector (Ref (Run m h))
runs = do
    -- no offset, no write buffer
    Maybe (Readers m h)
mreaders <- OffsetKey
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
-> Vector (Ref (Run m h))
-> m (Maybe (Readers m h))
forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
OffsetKey
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
-> Vector (Ref (Run m h))
-> m (Maybe (Readers m h))
Readers.new OffsetKey
Readers.NoOffsetKey Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
forall a. Maybe a
Nothing Vector (Ref (Run m h))
runs
    -- TODO: Exception safety! If Readers.new fails after already creating some
    -- run readers, or Builder.new fails, the run readers will stay open,
    -- holding handles of the input runs' files.
    Maybe (Readers m h)
-> (Readers m h -> m (Merge t m h)) -> m (Maybe (Merge t m h))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for Maybe (Readers m h)
mreaders ((Readers m h -> m (Merge t m h)) -> m (Maybe (Merge t m h)))
-> (Readers m h -> m (Merge t m h)) -> m (Maybe (Merge t m h))
forall a b. (a -> b) -> a -> b
$ \Readers m h
mergeReaders -> do
      -- calculate upper bounds based on input runs
      let numEntries :: NumEntries
numEntries = (Ref (Run m h) -> NumEntries)
-> Vector (Ref (Run m h)) -> NumEntries
forall m a. Monoid m => (a -> m) -> Vector a -> m
V.foldMap' Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Vector (Ref (Run m h))
runs
      RunBuilder m h
mergeBuilder <- HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> NumEntries
-> m (RunBuilder m h)
forall (m :: * -> *) h.
(MonadST m, MonadSTM m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> NumEntries
-> m (RunBuilder m h)
Builder.new HasFS m h
hfs HasBlockIO m h
hbio RunParams
runParams RunFsPaths
targetPaths NumEntries
numEntries
      MutVar (PrimState m) MergeState
mergeState <- MergeState -> m (MutVar (PrimState m) MergeState)
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar (MergeState -> m (MutVar (PrimState m) MergeState))
-> MergeState -> m (MutVar (PrimState m) MergeState)
forall a b. (a -> b) -> a -> b
$! MergeState
Merging
      Merge t m h -> m (Merge t m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Merge {
          mergeIsLastLevel :: Bool
mergeIsLastLevel = t -> Bool
forall t. IsMergeType t => t -> Bool
isLastLevel t
mergeType
        , mergeIsUnion :: Bool
mergeIsUnion = t -> Bool
forall t. IsMergeType t => t -> Bool
isUnion t
mergeType
        , mergeHasFS :: HasFS m h
mergeHasFS = HasFS m h
hfs
        , mergeHasBlockIO :: HasBlockIO m h
mergeHasBlockIO = HasBlockIO m h
hbio
        , t
MutVar (PrimState m) MergeState
RunBuilder m h
Readers m h
Mappend
mergeType :: t
mergeMappend :: Mappend
mergeReaders :: Readers m h
mergeBuilder :: RunBuilder m h
mergeState :: MutVar (PrimState m) MergeState
mergeType :: t
mergeMappend :: Mappend
mergeReaders :: Readers m h
mergeBuilder :: RunBuilder m h
mergeState :: MutVar (PrimState m) MergeState
..
        }

{-# SPECIALISE abort :: Merge t IO (FS.Handle h) -> IO () #-}
-- | This function should be called when discarding a 'Merge' before it
-- was done (i.e. returned 'MergeComplete'). This removes the incomplete files
-- created for the new run so far and avoids leaking file handles.
--
-- Once it has been called, do not use the 'Merge' any more!
abort :: (MonadMask m, MonadSTM m, MonadST m) => Merge t m h -> m ()
abort :: forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> m ()
abort Merge {t
Bool
HasFS m h
HasBlockIO m h
MutVar (PrimState m) MergeState
RunBuilder m h
Readers m h
Mappend
mergeType :: forall t (m :: * -> *) h. Merge t m h -> t
mergeIsLastLevel :: forall t (m :: * -> *) h. Merge t m h -> Bool
mergeIsUnion :: forall t (m :: * -> *) h. Merge t m h -> Bool
mergeMappend :: forall t (m :: * -> *) h. Merge t m h -> Mappend
mergeReaders :: forall t (m :: * -> *) h. Merge t m h -> Readers m h
mergeBuilder :: forall t (m :: * -> *) h. Merge t m h -> RunBuilder m h
mergeState :: forall t (m :: * -> *) h.
Merge t m h -> MutVar (PrimState m) MergeState
mergeHasFS :: forall t (m :: * -> *) h. Merge t m h -> HasFS m h
mergeHasBlockIO :: forall t (m :: * -> *) h. Merge t m h -> HasBlockIO m h
mergeType :: t
mergeIsLastLevel :: Bool
mergeIsUnion :: Bool
mergeMappend :: Mappend
mergeReaders :: Readers m h
mergeBuilder :: RunBuilder m h
mergeState :: MutVar (PrimState m) MergeState
mergeHasFS :: HasFS m h
mergeHasBlockIO :: HasBlockIO m h
..} = do
    MutVar (PrimState m) MergeState -> m MergeState
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) MergeState
mergeState m MergeState -> (MergeState -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      MergeState
Merging -> do
        Readers m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
Readers.close Readers m h
mergeReaders
        RunBuilder m h -> m ()
forall (m :: * -> *) h. MonadSTM m => RunBuilder m h -> m ()
Builder.close RunBuilder m h
mergeBuilder
      MergeState
MergingDone -> do
        -- the readers are already drained, therefore closed
        RunBuilder m h -> m ()
forall (m :: * -> *) h. MonadSTM m => RunBuilder m h -> m ()
Builder.close RunBuilder m h
mergeBuilder
      MergeState
Completed ->
        Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert Bool
False (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      MergeState
Closed ->
        Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert Bool
False (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    MutVar (PrimState m) MergeState -> MergeState -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) MergeState
mergeState (MergeState -> m ()) -> MergeState -> m ()
forall a b. (a -> b) -> a -> b
$! MergeState
Closed

{-# SPECIALISE complete ::
     Merge t IO h
  -> IO (Ref (Run IO h)) #-}
-- | Complete a 'Merge', returning a new 'Run' as the result of merging the
-- input runs.
--
-- All resources held by the merge are released, so do not use the it any more!
--
-- This function will /not/ do any merging work if there is any remaining. That
-- is, if not enough 'steps' were performed to exhaust the input 'Readers', this
-- function will throw an error.
--
-- Returns an error if the merge was not yet done, if it was already completed
-- before, or if it was already closed.
--
-- Note: this function creates new 'Run' resources, so it is recommended to run
-- this function with async exceptions masked. Otherwise, these resources can
-- leak. And it must eventually be released with 'releaseRef'.
--
complete ::
     (MonadSTM m, MonadST m, MonadMask m)
  => Merge t m h
  -> m (Ref (Run m h))
complete :: forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadMask m) =>
Merge t m h -> m (Ref (Run m h))
complete Merge{t
Bool
HasFS m h
HasBlockIO m h
MutVar (PrimState m) MergeState
RunBuilder m h
Readers m h
Mappend
mergeType :: forall t (m :: * -> *) h. Merge t m h -> t
mergeIsLastLevel :: forall t (m :: * -> *) h. Merge t m h -> Bool
mergeIsUnion :: forall t (m :: * -> *) h. Merge t m h -> Bool
mergeMappend :: forall t (m :: * -> *) h. Merge t m h -> Mappend
mergeReaders :: forall t (m :: * -> *) h. Merge t m h -> Readers m h
mergeBuilder :: forall t (m :: * -> *) h. Merge t m h -> RunBuilder m h
mergeState :: forall t (m :: * -> *) h.
Merge t m h -> MutVar (PrimState m) MergeState
mergeHasFS :: forall t (m :: * -> *) h. Merge t m h -> HasFS m h
mergeHasBlockIO :: forall t (m :: * -> *) h. Merge t m h -> HasBlockIO m h
mergeType :: t
mergeIsLastLevel :: Bool
mergeIsUnion :: Bool
mergeMappend :: Mappend
mergeReaders :: Readers m h
mergeBuilder :: RunBuilder m h
mergeState :: MutVar (PrimState m) MergeState
mergeHasFS :: HasFS m h
mergeHasBlockIO :: HasBlockIO m h
..} = do
    MutVar (PrimState m) MergeState -> m MergeState
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) MergeState
mergeState m MergeState
-> (MergeState -> m (Ref (Run m h))) -> m (Ref (Run m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      MergeState
Merging -> String -> m (Ref (Run m h))
forall a. (?callStack::CallStack) => String -> a
error String
"complete: Merge is not done"
      MergeState
MergingDone -> do
        -- the readers are already drained, therefore closed
        Ref (Run m h)
r <- RunBuilder m h -> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
RunBuilder m h -> m (Ref (Run m h))
Run.fromBuilder RunBuilder m h
mergeBuilder
        MutVar (PrimState m) MergeState -> MergeState -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) MergeState
mergeState (MergeState -> m ()) -> MergeState -> m ()
forall a b. (a -> b) -> a -> b
$! MergeState
Completed
        Ref (Run m h) -> m (Ref (Run m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ref (Run m h)
r
      MergeState
Completed -> String -> m (Ref (Run m h))
forall a. (?callStack::CallStack) => String -> a
error String
"complete: Merge is already completed"
      MergeState
Closed -> String -> m (Ref (Run m h))
forall a. (?callStack::CallStack) => String -> a
error String
"complete: Merge is closed"

{-# SPECIALISE stepsToCompletion ::
     Merge t IO h
  -> Int
  -> IO (Ref (Run IO h)) #-}
-- | Like 'steps', but calling 'complete' once the merge is finished.
--
-- Note: run with async exceptions masked. See 'complete'.
stepsToCompletion ::
      (MonadMask m, MonadSTM m, MonadST m)
   => Merge t m h
   -> Int
   -> m (Ref (Run m h))
stepsToCompletion :: forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> Int -> m (Ref (Run m h))
stepsToCompletion Merge t m h
m Int
stepBatchSize = m (Ref (Run m h))
go
  where
    go :: m (Ref (Run m h))
go = do
      Merge t m h -> Int -> m (Int, StepResult)
forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> Int -> m (Int, StepResult)
steps Merge t m h
m Int
stepBatchSize m (Int, StepResult)
-> ((Int, StepResult) -> m (Ref (Run m h))) -> m (Ref (Run m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        (Int
_, StepResult
MergeInProgress) -> m (Ref (Run m h))
go
        (Int
_, StepResult
MergeDone)       -> Merge t m h -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadMask m) =>
Merge t m h -> m (Ref (Run m h))
complete Merge t m h
m

{-# SPECIALISE stepsToCompletionCounted ::
     Merge t IO h
  -> Int
  -> IO (Int, Ref (Run IO h)) #-}
-- | Like 'steps', but calling 'complete' once the merge is finished.
--
-- Note: run with async exceptions masked. See 'complete'.
stepsToCompletionCounted ::
     (MonadMask m, MonadSTM m, MonadST m)
  => Merge t m h
  -> Int
  -> m (Int, Ref (Run m h))
stepsToCompletionCounted :: forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> Int -> m (Int, Ref (Run m h))
stepsToCompletionCounted Merge t m h
m Int
stepBatchSize = Int -> m (Int, Ref (Run m h))
go Int
0
  where
    go :: Int -> m (Int, Ref (Run m h))
go !Int
stepsSum = do
      Merge t m h -> Int -> m (Int, StepResult)
forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> Int -> m (Int, StepResult)
steps Merge t m h
m Int
stepBatchSize m (Int, StepResult)
-> ((Int, StepResult) -> m (Int, Ref (Run m h)))
-> m (Int, Ref (Run m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        (Int
n, StepResult
MergeInProgress) -> Int -> m (Int, Ref (Run m h))
go (Int
stepsSum Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
n)
        (Int
n, StepResult
MergeDone)       -> let !stepsSum' :: Int
stepsSum' = Int
stepsSum Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
n
                                in (Int
stepsSum',) (Ref (Run m h) -> (Int, Ref (Run m h)))
-> m (Ref (Run m h)) -> m (Int, Ref (Run m h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Merge t m h -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadMask m) =>
Merge t m h -> m (Ref (Run m h))
complete Merge t m h
m

data StepResult = MergeInProgress | MergeDone
  deriving stock StepResult -> StepResult -> Bool
(StepResult -> StepResult -> Bool)
-> (StepResult -> StepResult -> Bool) -> Eq StepResult
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: StepResult -> StepResult -> Bool
== :: StepResult -> StepResult -> Bool
$c/= :: StepResult -> StepResult -> Bool
/= :: StepResult -> StepResult -> Bool
Eq

stepsInvariant :: Int -> (Int, StepResult) -> Bool
stepsInvariant :: Int -> (Int, StepResult) -> Bool
stepsInvariant Int
requestedSteps = \case
    (Int
n, StepResult
MergeInProgress) -> Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
requestedSteps
    (Int, StepResult)
_                    -> Bool
True

{-# SPECIALISE steps ::
     Merge t IO h
  -> Int
  -> IO (Int, StepResult) #-}
-- | Do at least a given number of steps of merging. Each step reads a single
-- entry, then either resolves the previous entry with the new one or writes it
-- out to the run being created. Since we always finish resolving a key we
-- started, we might do slightly more work than requested.
--
-- Returns the number of input entries read, which is guaranteed to be at least
-- as many as requested (unless the merge is complete).
--
-- Returns an error if the merge was already completed or closed.
steps ::
     (MonadMask m, MonadSTM m, MonadST m)
  => Merge t m h
  -> Int  -- ^ How many input entries to consume (at least)
  -> m (Int, StepResult)
steps :: forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> Int -> m (Int, StepResult)
steps m :: Merge t m h
m@Merge {t
Bool
HasFS m h
HasBlockIO m h
MutVar (PrimState m) MergeState
RunBuilder m h
Readers m h
Mappend
mergeType :: forall t (m :: * -> *) h. Merge t m h -> t
mergeIsLastLevel :: forall t (m :: * -> *) h. Merge t m h -> Bool
mergeIsUnion :: forall t (m :: * -> *) h. Merge t m h -> Bool
mergeMappend :: forall t (m :: * -> *) h. Merge t m h -> Mappend
mergeReaders :: forall t (m :: * -> *) h. Merge t m h -> Readers m h
mergeBuilder :: forall t (m :: * -> *) h. Merge t m h -> RunBuilder m h
mergeState :: forall t (m :: * -> *) h.
Merge t m h -> MutVar (PrimState m) MergeState
mergeHasFS :: forall t (m :: * -> *) h. Merge t m h -> HasFS m h
mergeHasBlockIO :: forall t (m :: * -> *) h. Merge t m h -> HasBlockIO m h
mergeType :: t
mergeIsLastLevel :: Bool
mergeIsUnion :: Bool
mergeMappend :: Mappend
mergeReaders :: Readers m h
mergeBuilder :: RunBuilder m h
mergeState :: MutVar (PrimState m) MergeState
mergeHasFS :: HasFS m h
mergeHasBlockIO :: HasBlockIO m h
..} Int
requestedSteps = (Int, StepResult) -> (Int, StepResult)
assertStepsInvariant ((Int, StepResult) -> (Int, StepResult))
-> m (Int, StepResult) -> m (Int, StepResult)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> do
    -- TODO: ideally, we would not check whether the merge was already done on
    -- every call to @steps@. It is important for correctness, however, that we
    -- do not call @steps@ on a merge when it was already done. It is not yet
    -- clear whether our (upcoming) implementation of scheduled merges is going
    -- to satisfy this precondition when it calls @steps@, so for now we do the
    -- check.
    MutVar (PrimState m) MergeState -> m MergeState
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) MergeState
mergeState m MergeState
-> (MergeState -> m (Int, StepResult)) -> m (Int, StepResult)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      MergeState
Merging     -> if Bool
mergeIsUnion then Merge t m h -> Int -> m (Int, StepResult)
forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> Int -> m (Int, StepResult)
doStepsUnion Merge t m h
m Int
requestedSteps
                                     else Merge t m h -> Int -> m (Int, StepResult)
forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> Int -> m (Int, StepResult)
doStepsLevel Merge t m h
m Int
requestedSteps
      MergeState
MergingDone -> (Int, StepResult) -> m (Int, StepResult)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
0, StepResult
MergeDone)
      MergeState
Completed   -> String -> m (Int, StepResult)
forall a. (?callStack::CallStack) => String -> a
error String
"steps: Merge is completed"
      MergeState
Closed      -> String -> m (Int, StepResult)
forall a. (?callStack::CallStack) => String -> a
error String
"steps: Merge is closed"
  where
    assertStepsInvariant :: (Int, StepResult) -> (Int, StepResult)
assertStepsInvariant (Int, StepResult)
res = Bool -> (Int, StepResult) -> (Int, StepResult)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int -> (Int, StepResult) -> Bool
stepsInvariant Int
requestedSteps (Int, StepResult)
res) (Int, StepResult)
res

{-# SPECIALISE doStepsLevel ::
     Merge t IO h
  -> Int
  -> IO (Int, StepResult) #-}
doStepsLevel ::
     (MonadMask m, MonadSTM m, MonadST m)
  => Merge t m h
  -> Int  -- ^ How many input entries to consume (at least)
  -> m (Int, StepResult)
doStepsLevel :: forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> Int -> m (Int, StepResult)
doStepsLevel m :: Merge t m h
m@Merge {t
Bool
HasFS m h
HasBlockIO m h
MutVar (PrimState m) MergeState
RunBuilder m h
Readers m h
Mappend
mergeType :: forall t (m :: * -> *) h. Merge t m h -> t
mergeIsLastLevel :: forall t (m :: * -> *) h. Merge t m h -> Bool
mergeIsUnion :: forall t (m :: * -> *) h. Merge t m h -> Bool
mergeMappend :: forall t (m :: * -> *) h. Merge t m h -> Mappend
mergeReaders :: forall t (m :: * -> *) h. Merge t m h -> Readers m h
mergeBuilder :: forall t (m :: * -> *) h. Merge t m h -> RunBuilder m h
mergeState :: forall t (m :: * -> *) h.
Merge t m h -> MutVar (PrimState m) MergeState
mergeHasFS :: forall t (m :: * -> *) h. Merge t m h -> HasFS m h
mergeHasBlockIO :: forall t (m :: * -> *) h. Merge t m h -> HasBlockIO m h
mergeType :: t
mergeIsLastLevel :: Bool
mergeIsUnion :: Bool
mergeMappend :: Mappend
mergeReaders :: Readers m h
mergeBuilder :: RunBuilder m h
mergeState :: MutVar (PrimState m) MergeState
mergeHasFS :: HasFS m h
mergeHasBlockIO :: HasBlockIO m h
..} Int
requestedSteps = Int -> m (Int, StepResult)
go Int
0
  where
    go :: Int -> m (Int, StepResult)
go !Int
n
      | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
requestedSteps =
          (Int, StepResult) -> m (Int, StepResult)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
n, StepResult
MergeInProgress)
      | Bool
otherwise = do
          (SerialisedKey
key, Entry m h
entry, HasMore
hasMore) <- Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> m (SerialisedKey, Entry m h, HasMore)
Readers.pop Readers m h
mergeReaders
          case HasMore
hasMore of
            HasMore
Readers.HasMore ->
              Int -> SerialisedKey -> Entry m h -> m (Int, StepResult)
handleEntry (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) SerialisedKey
key Entry m h
entry
            HasMore
Readers.Drained -> do
              -- no future entries, no previous entry to resolve, just write!
              Merge t m h -> SerialisedKey -> Entry m h -> m ()
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadThrow m) =>
Merge t m h -> SerialisedKey -> Entry m h -> m ()
writeReaderEntry Merge t m h
m SerialisedKey
key Entry m h
entry
              MutVar (PrimState m) MergeState -> MergeState -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) MergeState
mergeState (MergeState -> m ()) -> MergeState -> m ()
forall a b. (a -> b) -> a -> b
$! MergeState
MergingDone
              (Int, StepResult) -> m (Int, StepResult)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, StepResult
MergeDone)

    handleEntry :: Int -> SerialisedKey -> Entry m h -> m (Int, StepResult)
handleEntry !Int
n !SerialisedKey
key (Reader.Entry (Mupdate SerialisedValue
v)) =
        -- resolve small mupsert vals with the following entries of the same key
        Int -> SerialisedKey -> SerialisedValue -> m (Int, StepResult)
handleMupdate Int
n SerialisedKey
key SerialisedValue
v
    handleEntry !Int
n !SerialisedKey
key (Reader.EntryOverflow (Mupdate SerialisedValue
v) RawPage
_ Word32
len [RawOverflowPage]
overflowPages) =
        -- resolve large mupsert vals with following entries of the same key
        Int -> SerialisedKey -> SerialisedValue -> m (Int, StepResult)
handleMupdate Int
n SerialisedKey
key (Word32 -> [RawOverflowPage] -> SerialisedValue -> SerialisedValue
Reader.appendOverflow Word32
len [RawOverflowPage]
overflowPages SerialisedValue
v)
    handleEntry !Int
n !SerialisedKey
key Entry m h
entry = do
        -- otherwise, we can just drop all following entries of same key
        Merge t m h -> SerialisedKey -> Entry m h -> m ()
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadThrow m) =>
Merge t m h -> SerialisedKey -> Entry m h -> m ()
writeReaderEntry Merge t m h
m SerialisedKey
key Entry m h
entry
        Int -> SerialisedKey -> m (Int, StepResult)
dropRemaining Int
n SerialisedKey
key

    -- the value is from a mupsert, complete (not just a prefix)
    handleMupdate :: Int -> SerialisedKey -> SerialisedValue -> m (Int, StepResult)
handleMupdate !Int
n !SerialisedKey
key !SerialisedValue
v = do
        SerialisedKey
nextKey <- Readers m h -> m SerialisedKey
forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
Readers.peekKey Readers m h
mergeReaders
        if SerialisedKey
nextKey SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
/= SerialisedKey
key
          then do
            -- resolved all entries for this key, write it
            Merge t m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadThrow m) =>
Merge t m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
writeSerialisedEntry Merge t m h
m SerialisedKey
key (SerialisedValue -> Entry SerialisedValue (RawBlobRef m h)
forall v b. v -> Entry v b
Mupdate SerialisedValue
v)
            Int -> m (Int, StepResult)
go Int
n
          else do
            (SerialisedKey
_, Entry m h
nextEntry, HasMore
hasMore) <- Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> m (SerialisedKey, Entry m h, HasMore)
Readers.pop Readers m h
mergeReaders
            -- for resolution, we need the full second value to be present
            let resolved :: Entry SerialisedValue (RawBlobRef m h)
resolved = Mappend
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
forall v b. (v -> v -> v) -> Entry v b -> Entry v b -> Entry v b
combine Mappend
mergeMappend
                             (SerialisedValue -> Entry SerialisedValue (RawBlobRef m h)
forall v b. v -> Entry v b
Mupdate SerialisedValue
v)
                             (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
Reader.toFullEntry Entry m h
nextEntry)
            case HasMore
hasMore of
              HasMore
Readers.HasMore -> case Entry SerialisedValue (RawBlobRef m h)
resolved of
                Mupdate SerialisedValue
v' ->
                  -- still a mupsert, keep resolving
                  Int -> SerialisedKey -> SerialisedValue -> m (Int, StepResult)
handleMupdate (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) SerialisedKey
key SerialisedValue
v'
                Entry SerialisedValue (RawBlobRef m h)
_ -> do
                  -- done with this key, now the remaining entries are obsolete
                  Merge t m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadThrow m) =>
Merge t m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
writeSerialisedEntry Merge t m h
m SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
resolved
                  Int -> SerialisedKey -> m (Int, StepResult)
dropRemaining (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) SerialisedKey
key
              HasMore
Readers.Drained -> do
                Merge t m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadThrow m) =>
Merge t m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
writeSerialisedEntry Merge t m h
m SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
resolved
                MutVar (PrimState m) MergeState -> MergeState -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) MergeState
mergeState (MergeState -> m ()) -> MergeState -> m ()
forall a b. (a -> b) -> a -> b
$! MergeState
MergingDone
                (Int, StepResult) -> m (Int, StepResult)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, StepResult
MergeDone)

    dropRemaining :: Int -> SerialisedKey -> m (Int, StepResult)
dropRemaining !Int
n !SerialisedKey
key = do
        (Int
dropped, HasMore
hasMore) <- Readers m h -> SerialisedKey -> m (Int, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> SerialisedKey -> m (Int, HasMore)
Readers.dropWhileKey Readers m h
mergeReaders SerialisedKey
key
        case HasMore
hasMore of
          HasMore
Readers.HasMore -> Int -> m (Int, StepResult)
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
dropped)
          HasMore
Readers.Drained -> do
            MutVar (PrimState m) MergeState -> MergeState -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) MergeState
mergeState (MergeState -> m ()) -> MergeState -> m ()
forall a b. (a -> b) -> a -> b
$! MergeState
MergingDone
            (Int, StepResult) -> m (Int, StepResult)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
dropped, StepResult
MergeDone)

{-# SPECIALISE doStepsUnion ::
     Merge t IO h
  -> Int
  -> IO (Int, StepResult) #-}
doStepsUnion ::
     (MonadMask m, MonadSTM m, MonadST m)
  => Merge t m h
  -> Int  -- ^ How many input entries to consume (at least)
  -> m (Int, StepResult)
doStepsUnion :: forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> Int -> m (Int, StepResult)
doStepsUnion m :: Merge t m h
m@Merge {t
Bool
HasFS m h
HasBlockIO m h
MutVar (PrimState m) MergeState
RunBuilder m h
Readers m h
Mappend
mergeType :: forall t (m :: * -> *) h. Merge t m h -> t
mergeIsLastLevel :: forall t (m :: * -> *) h. Merge t m h -> Bool
mergeIsUnion :: forall t (m :: * -> *) h. Merge t m h -> Bool
mergeMappend :: forall t (m :: * -> *) h. Merge t m h -> Mappend
mergeReaders :: forall t (m :: * -> *) h. Merge t m h -> Readers m h
mergeBuilder :: forall t (m :: * -> *) h. Merge t m h -> RunBuilder m h
mergeState :: forall t (m :: * -> *) h.
Merge t m h -> MutVar (PrimState m) MergeState
mergeHasFS :: forall t (m :: * -> *) h. Merge t m h -> HasFS m h
mergeHasBlockIO :: forall t (m :: * -> *) h. Merge t m h -> HasBlockIO m h
mergeType :: t
mergeIsLastLevel :: Bool
mergeIsUnion :: Bool
mergeMappend :: Mappend
mergeReaders :: Readers m h
mergeBuilder :: RunBuilder m h
mergeState :: MutVar (PrimState m) MergeState
mergeHasFS :: HasFS m h
mergeHasBlockIO :: HasBlockIO m h
..} Int
requestedSteps = Int -> m (Int, StepResult)
go Int
0
  where
    go :: Int -> m (Int, StepResult)
go !Int
n
      | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
requestedSteps =
          (Int, StepResult) -> m (Int, StepResult)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
n, StepResult
MergeInProgress)
      | Bool
otherwise = do
          (SerialisedKey
key, Entry m h
entry, HasMore
hasMore) <- Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> m (SerialisedKey, Entry m h, HasMore)
Readers.pop Readers m h
mergeReaders
          Int -> SerialisedKey -> Entry m h -> HasMore -> m (Int, StepResult)
handleEntry (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) SerialisedKey
key Entry m h
entry HasMore
hasMore

    -- Similar to 'handleMupdate' in 'stepsLevel', but here we have to combine
    -- all entries monoidally, so there are no obsolete/overwritten entries
    -- that we could skip.
    --
    -- TODO(optimisation): If mergeMappend is const (which happens when calling
    -- `union` on a non-monoidal table), we could skip all remaining entries for
    -- the key. Unfortunately, we can't inspect the function. This would require
    -- encoding it as something like `Const | Resolve (_ -> _ -> _)`.
    handleEntry :: Int -> SerialisedKey -> Entry m h -> HasMore -> m (Int, StepResult)
handleEntry !Int
n !SerialisedKey
key !Entry m h
entry HasMore
Readers.Drained = do
        -- no future entries, no previous entry to resolve, just write!
        Merge t m h -> SerialisedKey -> Entry m h -> m ()
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadThrow m) =>
Merge t m h -> SerialisedKey -> Entry m h -> m ()
writeReaderEntry Merge t m h
m SerialisedKey
key Entry m h
entry
        MutVar (PrimState m) MergeState -> MergeState -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) MergeState
mergeState (MergeState -> m ()) -> MergeState -> m ()
forall a b. (a -> b) -> a -> b
$! MergeState
MergingDone
        (Int, StepResult) -> m (Int, StepResult)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
n, StepResult
MergeDone)

    handleEntry !Int
n !SerialisedKey
key !Entry m h
entry HasMore
Readers.HasMore = do
        SerialisedKey
nextKey <- Readers m h -> m SerialisedKey
forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
Readers.peekKey Readers m h
mergeReaders
        if SerialisedKey
nextKey SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
/= SerialisedKey
key
          then do
            -- resolved all entries for this key, write it
            Merge t m h -> SerialisedKey -> Entry m h -> m ()
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadThrow m) =>
Merge t m h -> SerialisedKey -> Entry m h -> m ()
writeReaderEntry Merge t m h
m SerialisedKey
key Entry m h
entry
            Int -> m (Int, StepResult)
go Int
n
          else do
            (SerialisedKey
_, Entry m h
nextEntry, HasMore
hasMore) <- Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> m (SerialisedKey, Entry m h, HasMore)
Readers.pop Readers m h
mergeReaders
            -- for resolution, we need the full second value to be present
            let resolved :: Entry SerialisedValue (RawBlobRef m h)
resolved = Mappend
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
forall v b. (v -> v -> v) -> Entry v b -> Entry v b -> Entry v b
combineUnion Mappend
mergeMappend
                             (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
Reader.toFullEntry Entry m h
entry)
                             (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
Reader.toFullEntry Entry m h
nextEntry)
            Int -> SerialisedKey -> Entry m h -> HasMore -> m (Int, StepResult)
handleEntry (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) SerialisedKey
key (Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
Reader.Entry Entry SerialisedValue (RawBlobRef m h)
resolved) HasMore
hasMore

{-# INLINE writeReaderEntry #-}
writeReaderEntry ::
     (MonadSTM m, MonadST m, MonadThrow m)
  => Merge t m h
  -> SerialisedKey
  -> Reader.Entry m h
  -> m ()
writeReaderEntry :: forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadThrow m) =>
Merge t m h -> SerialisedKey -> Entry m h -> m ()
writeReaderEntry Merge t m h
m SerialisedKey
key (Reader.Entry Entry SerialisedValue (RawBlobRef m h)
entryFull) =
      -- Small entry.
      -- Note that this small entry could be the only one on the page. We only
      -- care about it being small, not single-entry, since it could still end
      -- up sharing a page with other entries in the merged run.
      -- TODO(optimise): This doesn't fully exploit the case where there is a
      -- single page small entry on the page which again ends up as the only
      -- entry of a page (which would for example happen a lot if most entries
      -- have 2k-4k bytes). In that case we could have copied the RawPage
      -- (but we find out too late to easily exploit it).
      Merge t m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadThrow m) =>
Merge t m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
writeSerialisedEntry Merge t m h
m SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entryFull
writeReaderEntry Merge t m h
m SerialisedKey
key entry :: Entry m h
entry@(Reader.EntryOverflow Entry SerialisedValue (RawBlobRef m h)
prefix RawPage
page Word32
_ [RawOverflowPage]
overflowPages)
  | InsertWithBlob {} <- Entry SerialisedValue (RawBlobRef m h)
prefix =
      Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Merge t m h -> Entry SerialisedValue (RawBlobRef m h) -> Bool
forall t (m :: * -> *) h v b. Merge t m h -> Entry v b -> Bool
shouldWriteEntry Merge t m h
m Entry SerialisedValue (RawBlobRef m h)
prefix) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do -- large, can't be delete
        -- has blob, we can't just copy the first page, fall back
        -- we simply append the overflow pages to the value
        RunBuilder m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadThrow m) =>
RunBuilder m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
Builder.addKeyOp (Merge t m h -> RunBuilder m h
forall t (m :: * -> *) h. Merge t m h -> RunBuilder m h
mergeBuilder Merge t m h
m) SerialisedKey
key (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
Reader.toFullEntry Entry m h
entry)
        -- TODO(optimise): This copies the overflow pages unnecessarily.
        -- We could extend the RunBuilder API to allow to either:
        -- 1. write an Entry (containing the value prefix) + [RawOverflowPage]
        -- 2. write a RawPage + SerialisedBlob + [RawOverflowPage], rewriting
        --      the raw page's blob offset (slightly faster, but a bit hacky)
  | Bool
otherwise =
      Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Merge t m h -> Entry SerialisedValue (RawBlobRef m h) -> Bool
forall t (m :: * -> *) h v b. Merge t m h -> Entry v b -> Bool
shouldWriteEntry Merge t m h
m Entry SerialisedValue (RawBlobRef m h)
prefix) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$  -- large, can't be delete
        -- no blob, directly copy all pages as they are
        RunBuilder m h
-> SerialisedKey -> RawPage -> [RawOverflowPage] -> m ()
forall (m :: * -> *) h.
(MonadST m, MonadSTM m) =>
RunBuilder m h
-> SerialisedKey -> RawPage -> [RawOverflowPage] -> m ()
Builder.addLargeSerialisedKeyOp (Merge t m h -> RunBuilder m h
forall t (m :: * -> *) h. Merge t m h -> RunBuilder m h
mergeBuilder Merge t m h
m) SerialisedKey
key RawPage
page [RawOverflowPage]
overflowPages

{-# INLINE writeSerialisedEntry #-}
writeSerialisedEntry ::
     (MonadSTM m, MonadST m, MonadThrow m)
  => Merge t m h
  -> SerialisedKey
  -> Entry SerialisedValue (RawBlobRef m h)
  -> m ()
writeSerialisedEntry :: forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadThrow m) =>
Merge t m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
writeSerialisedEntry Merge t m h
m SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entry =
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Merge t m h -> Entry SerialisedValue (RawBlobRef m h) -> Bool
forall t (m :: * -> *) h v b. Merge t m h -> Entry v b -> Bool
shouldWriteEntry Merge t m h
m Entry SerialisedValue (RawBlobRef m h)
entry) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
      RunBuilder m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadThrow m) =>
RunBuilder m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
Builder.addKeyOp (Merge t m h -> RunBuilder m h
forall t (m :: * -> *) h. Merge t m h -> RunBuilder m h
mergeBuilder Merge t m h
m) SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entry

-- On the last level we could also turn Mupdate into Insert, but no need to
-- complicate things.
shouldWriteEntry :: Merge t m h -> Entry v b -> Bool
shouldWriteEntry :: forall t (m :: * -> *) h v b. Merge t m h -> Entry v b -> Bool
shouldWriteEntry Merge t m h
m Entry v b
Delete = Bool -> Bool
not (Merge t m h -> Bool
forall t (m :: * -> *) h. Merge t m h -> Bool
mergeIsLastLevel Merge t m h
m)
shouldWriteEntry Merge t m h
_ Entry v b
_      = Bool
True