{-# OPTIONS_HADDOCK not-home #-}

module Database.LSMTree.Internal.WriteBufferWriter
  ( writeWriteBuffer
  ) where

import           Control.Exception (assert)
import           Control.Monad (void, when)
import           Control.Monad.Class.MonadST (MonadST (..))
import qualified Control.Monad.Class.MonadST as ST
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow (MonadThrow)
import           Control.Monad.Primitive (PrimMonad (..))
import           Control.Monad.ST (ST)
import           Control.RefCount (Ref)
import           Data.Foldable (for_)
import           Data.Maybe (maybeToList)
import           Data.Primitive.PrimVar (PrimVar, newPrimVar)
import           Data.Word (Word64)
import           Database.LSMTree.Internal.BlobFile (BlobSpan)
import           Database.LSMTree.Internal.BlobRef (RawBlobRef)
import           Database.LSMTree.Internal.ChecksumHandle (ChecksumHandle,
                     closeHandle, copyBlob, dropCache, makeHandle, readChecksum,
                     writeRawOverflowPages, writeRawPage)
import qualified Database.LSMTree.Internal.CRC32C as CRC
import           Database.LSMTree.Internal.Entry (Entry)
import           Database.LSMTree.Internal.PageAcc (PageAcc)
import qualified Database.LSMTree.Internal.PageAcc as PageAcc
import qualified Database.LSMTree.Internal.PageAcc1 as PageAcc
import           Database.LSMTree.Internal.Paths (ForBlob (..), ForKOps (..),
                     WriteBufferFsPaths, toChecksumsFileForWriteBufferFiles,
                     writeBufferBlobPath, writeBufferChecksumsPath,
                     writeBufferKOpsPath)
import           Database.LSMTree.Internal.RawOverflowPage (RawOverflowPage)
import           Database.LSMTree.Internal.RawPage (RawPage)
import           Database.LSMTree.Internal.Serialise (SerialisedKey,
                     SerialisedValue)
import           Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import           Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs)
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified System.FS.API as FS
import           System.FS.API (HasFS)
import qualified System.FS.BlockIO.API as FS
import           System.FS.BlockIO.API (HasBlockIO)


{-# SPECIALISE
  writeWriteBuffer ::
       HasFS IO h
    -> HasBlockIO IO h
    -> WriteBufferFsPaths
    -> WriteBuffer
    -> Ref (WriteBufferBlobs IO h)
    -> IO ()
  #-}
-- | Write a 'WriteBuffer' to disk.
writeWriteBuffer ::
     (MonadSTM m, MonadST m, MonadThrow m)
  => HasFS m h
  -> HasBlockIO m h
  -> WriteBufferFsPaths
  -> WriteBuffer
  -> Ref (WriteBufferBlobs m h)
  -> m ()
writeWriteBuffer :: forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadThrow m) =>
HasFS m h
-> HasBlockIO m h
-> WriteBufferFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m ()
writeWriteBuffer HasFS m h
hfs HasBlockIO m h
hbio WriteBufferFsPaths
fsPaths WriteBuffer
buffer Ref (WriteBufferBlobs m h)
blobs = do
  WriteBufferWriter m h
writer <- HasFS m h
-> HasBlockIO m h
-> WriteBufferFsPaths
-> m (WriteBufferWriter m h)
forall (m :: * -> *) h.
(MonadST m, MonadSTM m) =>
HasFS m h
-> HasBlockIO m h
-> WriteBufferFsPaths
-> m (WriteBufferWriter m h)
new HasFS m h
hfs HasBlockIO m h
hbio WriteBufferFsPaths
fsPaths
  [(SerialisedKey, Entry SerialisedValue BlobSpan)]
-> ((SerialisedKey, Entry SerialisedValue BlobSpan) -> m ())
-> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (WriteBuffer -> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
WB.toList WriteBuffer
buffer) (((SerialisedKey, Entry SerialisedValue BlobSpan) -> m ()) -> m ())
-> ((SerialisedKey, Entry SerialisedValue BlobSpan) -> m ())
-> m ()
forall a b. (a -> b) -> a -> b
$ \(SerialisedKey
key, Entry SerialisedValue BlobSpan
op) ->
    -- TODO: The fmap entry here reallocates even when there are no blobs.
    WriteBufferWriter m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadThrow m) =>
WriteBufferWriter m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
addKeyOp WriteBufferWriter m h
writer SerialisedKey
key (Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
WBB.mkRawBlobRef Ref (WriteBufferBlobs m h)
blobs (BlobSpan -> RawBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (RawBlobRef m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Entry SerialisedValue BlobSpan
op)
  m (HasFS m h, HasBlockIO m h, WriteBufferFsPaths) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (HasFS m h, HasBlockIO m h, WriteBufferFsPaths) -> m ())
-> m (HasFS m h, HasBlockIO m h, WriteBufferFsPaths) -> m ()
forall a b. (a -> b) -> a -> b
$ Bool
-> WriteBufferWriter m h
-> m (HasFS m h, HasBlockIO m h, WriteBufferFsPaths)
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadThrow m) =>
Bool
-> WriteBufferWriter m h
-> m (HasFS m h, HasBlockIO m h, WriteBufferFsPaths)
unsafeFinalise Bool
True WriteBufferWriter m h
writer

-- | The in-memory representation of an LSM 'WriteBuffer' that is in the process of being serialised to disk.
data WriteBufferWriter m h = WriteBufferWriter
  { -- | The file system paths for all the files used by the serialised write buffer.
    forall (m :: * -> *) h. WriteBufferWriter m h -> WriteBufferFsPaths
writerFsPaths    :: !WriteBufferFsPaths,
    -- | The page accumulator.
    forall (m :: * -> *) h.
WriteBufferWriter m h -> PageAcc (PrimState m)
writerPageAcc    :: !(PageAcc (PrimState m)),
    -- | The byte offset within the blob file for the next blob to be written.
    forall (m :: * -> *) h.
WriteBufferWriter m h -> PrimVar (PrimState m) Word64
writerBlobOffset :: !(PrimVar (PrimState m) Word64),
    -- | The (write mode) file handles.
    forall (m :: * -> *) h.
WriteBufferWriter m h -> ForKOps (ChecksumHandle (PrimState m) h)
writerKOpsHandle :: !(ForKOps (ChecksumHandle (PrimState m) h)),
    forall (m :: * -> *) h.
WriteBufferWriter m h -> ForBlob (ChecksumHandle (PrimState m) h)
writerBlobHandle :: !(ForBlob (ChecksumHandle (PrimState m) h)),
    forall (m :: * -> *) h. WriteBufferWriter m h -> HasFS m h
writerHasFS      :: !(HasFS m h),
    forall (m :: * -> *) h. WriteBufferWriter m h -> HasBlockIO m h
writerHasBlockIO :: !(HasBlockIO m h)
  }

{-# SPECIALISE
  new ::
       HasFS IO h
    -> HasBlockIO IO h
    -> WriteBufferFsPaths
    -> IO (WriteBufferWriter IO h)
  #-}
-- | Create a 'WriteBufferWriter' to start serialising a 'WriteBuffer'.
--
-- See 'Database.LSMTree.Internal.RunBuilder.new'.
--
-- NOTE: 'new' assumes that the directory passed via 'WriteBufferFsPaths' exists.
new ::
     (MonadST m, MonadSTM m)
  => HasFS m h
  -> HasBlockIO m h
  -> WriteBufferFsPaths
  -> m (WriteBufferWriter m h)
new :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m) =>
HasFS m h
-> HasBlockIO m h
-> WriteBufferFsPaths
-> m (WriteBufferWriter m h)
new HasFS m h
hfs HasBlockIO m h
hbio WriteBufferFsPaths
fsPaths = do
  PageAcc (PrimState m)
writerPageAcc <- ST (PrimState m) (PageAcc (PrimState m))
-> m (PageAcc (PrimState m))
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO ST (PrimState m) (PageAcc (PrimState m))
forall s. ST s (PageAcc s)
PageAcc.newPageAcc
  PrimVar (PrimState m) Word64
writerBlobOffset <- Word64 -> m (PrimVar (PrimState m) Word64)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
a -> m (PrimVar (PrimState m) a)
newPrimVar Word64
0
  ForKOps (ChecksumHandle (PrimState m) h)
writerKOpsHandle <- ChecksumHandle (PrimState m) h
-> ForKOps (ChecksumHandle (PrimState m) h)
forall a. a -> ForKOps a
ForKOps (ChecksumHandle (PrimState m) h
 -> ForKOps (ChecksumHandle (PrimState m) h))
-> m (ChecksumHandle (PrimState m) h)
-> m (ForKOps (ChecksumHandle (PrimState m) h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HasFS m h -> FsPath -> m (ChecksumHandle (PrimState m) h)
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h -> FsPath -> m (ChecksumHandle (PrimState m) h)
makeHandle HasFS m h
hfs (WriteBufferFsPaths -> FsPath
writeBufferKOpsPath WriteBufferFsPaths
fsPaths)
  ForBlob (ChecksumHandle (PrimState m) h)
writerBlobHandle <- ChecksumHandle (PrimState m) h
-> ForBlob (ChecksumHandle (PrimState m) h)
forall a. a -> ForBlob a
ForBlob (ChecksumHandle (PrimState m) h
 -> ForBlob (ChecksumHandle (PrimState m) h))
-> m (ChecksumHandle (PrimState m) h)
-> m (ForBlob (ChecksumHandle (PrimState m) h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HasFS m h -> FsPath -> m (ChecksumHandle (PrimState m) h)
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h -> FsPath -> m (ChecksumHandle (PrimState m) h)
makeHandle HasFS m h
hfs (WriteBufferFsPaths -> FsPath
writeBufferBlobPath WriteBufferFsPaths
fsPaths)
  WriteBufferWriter m h -> m (WriteBufferWriter m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WriteBufferWriter
    { writerFsPaths :: WriteBufferFsPaths
writerFsPaths    = WriteBufferFsPaths
fsPaths,
      writerHasFS :: HasFS m h
writerHasFS      = HasFS m h
hfs,
      writerHasBlockIO :: HasBlockIO m h
writerHasBlockIO = HasBlockIO m h
hbio,
      PrimVar (PrimState m) Word64
ForBlob (ChecksumHandle (PrimState m) h)
ForKOps (ChecksumHandle (PrimState m) h)
PageAcc (PrimState m)
writerPageAcc :: PageAcc (PrimState m)
writerBlobOffset :: PrimVar (PrimState m) Word64
writerKOpsHandle :: ForKOps (ChecksumHandle (PrimState m) h)
writerBlobHandle :: ForBlob (ChecksumHandle (PrimState m) h)
writerPageAcc :: PageAcc (PrimState m)
writerBlobOffset :: PrimVar (PrimState m) Word64
writerKOpsHandle :: ForKOps (ChecksumHandle (PrimState m) h)
writerBlobHandle :: ForBlob (ChecksumHandle (PrimState m) h)
..
    }

{-# SPECIALISE
  unsafeFinalise ::
       Bool
    -> WriteBufferWriter IO h
    -> IO (HasFS IO h, HasBlockIO IO h, WriteBufferFsPaths)
  #-}
-- | Finalise an incremental 'WriteBufferWriter'.
--
-- Do /not/ use a 'WriteBufferWriter' after finalising it.
--
-- See 'Database.LSMTree.Internal.RunBuilder.unsafeFinalise'.
--
-- TODO: Ensure proper cleanup even in presence of exceptions.
unsafeFinalise ::
     (MonadST m, MonadSTM m, MonadThrow m)
  => Bool -- ^ drop caches
  -> WriteBufferWriter m h
  -> m (HasFS m h, HasBlockIO m h, WriteBufferFsPaths)
unsafeFinalise :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadThrow m) =>
Bool
-> WriteBufferWriter m h
-> m (HasFS m h, HasBlockIO m h, WriteBufferFsPaths)
unsafeFinalise Bool
dropCaches WriteBufferWriter {HasFS m h
HasBlockIO m h
PrimVar (PrimState m) Word64
WriteBufferFsPaths
ForBlob (ChecksumHandle (PrimState m) h)
ForKOps (ChecksumHandle (PrimState m) h)
PageAcc (PrimState m)
writerFsPaths :: forall (m :: * -> *) h. WriteBufferWriter m h -> WriteBufferFsPaths
writerPageAcc :: forall (m :: * -> *) h.
WriteBufferWriter m h -> PageAcc (PrimState m)
writerBlobOffset :: forall (m :: * -> *) h.
WriteBufferWriter m h -> PrimVar (PrimState m) Word64
writerKOpsHandle :: forall (m :: * -> *) h.
WriteBufferWriter m h -> ForKOps (ChecksumHandle (PrimState m) h)
writerBlobHandle :: forall (m :: * -> *) h.
WriteBufferWriter m h -> ForBlob (ChecksumHandle (PrimState m) h)
writerHasFS :: forall (m :: * -> *) h. WriteBufferWriter m h -> HasFS m h
writerHasBlockIO :: forall (m :: * -> *) h. WriteBufferWriter m h -> HasBlockIO m h
writerFsPaths :: WriteBufferFsPaths
writerPageAcc :: PageAcc (PrimState m)
writerBlobOffset :: PrimVar (PrimState m) Word64
writerKOpsHandle :: ForKOps (ChecksumHandle (PrimState m) h)
writerBlobHandle :: ForBlob (ChecksumHandle (PrimState m) h)
writerHasFS :: HasFS m h
writerHasBlockIO :: HasBlockIO m h
..} = do
  -- write final bits
  Maybe RawPage
mPage <- ST (PrimState m) (Maybe RawPage) -> m (Maybe RawPage)
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO (ST (PrimState m) (Maybe RawPage) -> m (Maybe RawPage))
-> ST (PrimState m) (Maybe RawPage) -> m (Maybe RawPage)
forall a b. (a -> b) -> a -> b
$ PageAcc (PrimState m) -> ST (PrimState m) (Maybe RawPage)
forall s. PageAcc s -> ST s (Maybe RawPage)
flushPageIfNonEmpty PageAcc (PrimState m)
writerPageAcc
  Maybe RawPage -> (RawPage -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe RawPage
mPage ((RawPage -> m ()) -> m ()) -> (RawPage -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
writeRawPage HasFS m h
writerHasFS ForKOps (ChecksumHandle (PrimState m) h)
writerKOpsHandle
  ForKOps CRC32C
kOpsChecksum <- (ChecksumHandle (PrimState m) h -> m CRC32C)
-> ForKOps (ChecksumHandle (PrimState m) h) -> m (ForKOps CRC32C)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> ForKOps a -> f (ForKOps b)
traverse ChecksumHandle (PrimState m) h -> m CRC32C
forall (m :: * -> *) h.
PrimMonad m =>
ChecksumHandle (PrimState m) h -> m CRC32C
readChecksum ForKOps (ChecksumHandle (PrimState m) h)
writerKOpsHandle
  ForBlob CRC32C
blobChecksum <- (ChecksumHandle (PrimState m) h -> m CRC32C)
-> ForBlob (ChecksumHandle (PrimState m) h) -> m (ForBlob CRC32C)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> ForBlob a -> f (ForBlob b)
traverse ChecksumHandle (PrimState m) h -> m CRC32C
forall (m :: * -> *) h.
PrimMonad m =>
ChecksumHandle (PrimState m) h -> m CRC32C
readChecksum ForBlob (ChecksumHandle (PrimState m) h)
writerBlobHandle
  let checksums :: ChecksumsFile
checksums = (ForKOps CRC32C, ForBlob CRC32C) -> ChecksumsFile
toChecksumsFileForWriteBufferFiles (ForKOps CRC32C
kOpsChecksum, ForBlob CRC32C
blobChecksum)
  HasFS m h -> FsPath -> OpenMode -> (Handle h -> m ()) -> m ()
forall (m :: * -> *) h a.
(HasCallStack, MonadThrow m) =>
HasFS m h -> FsPath -> OpenMode -> (Handle h -> m a) -> m a
FS.withFile HasFS m h
writerHasFS (WriteBufferFsPaths -> FsPath
writeBufferChecksumsPath WriteBufferFsPaths
writerFsPaths) (AllowExisting -> OpenMode
FS.WriteMode AllowExisting
FS.MustBeNew) ((Handle h -> m ()) -> m ()) -> (Handle h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Handle h
h -> do
    HasFS m h -> Handle h -> ChecksumsFile -> m ()
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> Handle h -> ChecksumsFile -> m ()
CRC.writeChecksumsFile' HasFS m h
writerHasFS Handle h
h ChecksumsFile
checksums
    HasBlockIO m h -> Handle h -> m ()
forall (m :: * -> *) h. HasBlockIO m h -> Handle h -> m ()
FS.hDropCacheAll HasBlockIO m h
writerHasBlockIO Handle h
h
  -- drop the KOps and blobs files from the cache if asked for
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
dropCaches (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
dropCache HasBlockIO m h
writerHasBlockIO (ForKOps (ChecksumHandle (PrimState m) h)
-> ChecksumHandle (PrimState m) h
forall a. ForKOps a -> a
unForKOps ForKOps (ChecksumHandle (PrimState m) h)
writerKOpsHandle)
    HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
dropCache HasBlockIO m h
writerHasBlockIO (ForBlob (ChecksumHandle (PrimState m) h)
-> ChecksumHandle (PrimState m) h
forall a. ForBlob a -> a
unForBlob ForBlob (ChecksumHandle (PrimState m) h)
writerBlobHandle)
  HasFS m h -> ChecksumHandle (PrimState m) h -> m ()
forall (m :: * -> *) h.
HasFS m h -> ChecksumHandle (PrimState m) h -> m ()
closeHandle HasFS m h
writerHasFS (ForKOps (ChecksumHandle (PrimState m) h)
-> ChecksumHandle (PrimState m) h
forall a. ForKOps a -> a
unForKOps ForKOps (ChecksumHandle (PrimState m) h)
writerKOpsHandle)
  HasFS m h -> ChecksumHandle (PrimState m) h -> m ()
forall (m :: * -> *) h.
HasFS m h -> ChecksumHandle (PrimState m) h -> m ()
closeHandle HasFS m h
writerHasFS (ForBlob (ChecksumHandle (PrimState m) h)
-> ChecksumHandle (PrimState m) h
forall a. ForBlob a -> a
unForBlob ForBlob (ChecksumHandle (PrimState m) h)
writerBlobHandle)
  (HasFS m h, HasBlockIO m h, WriteBufferFsPaths)
-> m (HasFS m h, HasBlockIO m h, WriteBufferFsPaths)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (HasFS m h
writerHasFS, HasBlockIO m h
writerHasBlockIO, WriteBufferFsPaths
writerFsPaths)


{-# SPECIALIZE
  addKeyOp ::
       WriteBufferWriter IO h
    -> SerialisedKey
    -> Entry SerialisedValue (RawBlobRef IO h)
    -> IO ()
  #-}
-- | See 'Database.LSMTree.Internal.RunBuilder.addKeyOp'.
addKeyOp ::
     (MonadST m, MonadSTM m, MonadThrow m)
  => WriteBufferWriter m h
  -> SerialisedKey
  -> Entry SerialisedValue (RawBlobRef m h)
  -> m ()
addKeyOp :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadThrow m) =>
WriteBufferWriter m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
addKeyOp WriteBufferWriter{HasFS m h
HasBlockIO m h
PrimVar (PrimState m) Word64
WriteBufferFsPaths
ForBlob (ChecksumHandle (PrimState m) h)
ForKOps (ChecksumHandle (PrimState m) h)
PageAcc (PrimState m)
writerFsPaths :: forall (m :: * -> *) h. WriteBufferWriter m h -> WriteBufferFsPaths
writerPageAcc :: forall (m :: * -> *) h.
WriteBufferWriter m h -> PageAcc (PrimState m)
writerBlobOffset :: forall (m :: * -> *) h.
WriteBufferWriter m h -> PrimVar (PrimState m) Word64
writerKOpsHandle :: forall (m :: * -> *) h.
WriteBufferWriter m h -> ForKOps (ChecksumHandle (PrimState m) h)
writerBlobHandle :: forall (m :: * -> *) h.
WriteBufferWriter m h -> ForBlob (ChecksumHandle (PrimState m) h)
writerHasFS :: forall (m :: * -> *) h. WriteBufferWriter m h -> HasFS m h
writerHasBlockIO :: forall (m :: * -> *) h. WriteBufferWriter m h -> HasBlockIO m h
writerFsPaths :: WriteBufferFsPaths
writerPageAcc :: PageAcc (PrimState m)
writerBlobOffset :: PrimVar (PrimState m) Word64
writerKOpsHandle :: ForKOps (ChecksumHandle (PrimState m) h)
writerBlobHandle :: ForBlob (ChecksumHandle (PrimState m) h)
writerHasFS :: HasFS m h
writerHasBlockIO :: HasBlockIO m h
..} SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
op = do
  -- TODO: consider optimisation described in 'Database.LSMTree.Internal.RunBuilder.addKeyOp'.
  Entry SerialisedValue BlobSpan
op' <- (RawBlobRef m h -> m BlobSpan)
-> Entry SerialisedValue (RawBlobRef m h)
-> m (Entry SerialisedValue BlobSpan)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b)
-> Entry SerialisedValue a -> f (Entry SerialisedValue b)
traverse (HasFS m h
-> PrimVar (PrimState m) Word64
-> ForBlob (ChecksumHandle (PrimState m) h)
-> RawBlobRef m h
-> m BlobSpan
forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m, PrimMonad m) =>
HasFS m h
-> PrimVar (PrimState m) Word64
-> ForBlob (ChecksumHandle (PrimState m) h)
-> RawBlobRef m h
-> m BlobSpan
copyBlob HasFS m h
writerHasFS PrimVar (PrimState m) Word64
writerBlobOffset ForBlob (ChecksumHandle (PrimState m) h)
writerBlobHandle) Entry SerialisedValue (RawBlobRef m h)
op
  if SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> Bool
forall b. SerialisedKey -> Entry SerialisedValue b -> Bool
PageAcc.entryWouldFitInPage SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
op
    then do
      Maybe RawPage
mPage <- ST (PrimState m) (Maybe RawPage) -> m (Maybe RawPage)
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO (ST (PrimState m) (Maybe RawPage) -> m (Maybe RawPage))
-> ST (PrimState m) (Maybe RawPage) -> m (Maybe RawPage)
forall a b. (a -> b) -> a -> b
$ PageAcc (PrimState m)
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST (PrimState m) (Maybe RawPage)
forall s.
PageAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s (Maybe RawPage)
addSmallKeyOp PageAcc (PrimState m)
writerPageAcc SerialisedKey
key Entry SerialisedValue BlobSpan
op'
      Maybe RawPage -> (RawPage -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe RawPage
mPage ((RawPage -> m ()) -> m ()) -> (RawPage -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
writeRawPage HasFS m h
writerHasFS ForKOps (ChecksumHandle (PrimState m) h)
writerKOpsHandle
    else do
      ([RawPage]
pages, [RawOverflowPage]
overflowPages) <- ST (PrimState m) ([RawPage], [RawOverflowPage])
-> m ([RawPage], [RawOverflowPage])
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO (ST (PrimState m) ([RawPage], [RawOverflowPage])
 -> m ([RawPage], [RawOverflowPage]))
-> ST (PrimState m) ([RawPage], [RawOverflowPage])
-> m ([RawPage], [RawOverflowPage])
forall a b. (a -> b) -> a -> b
$ PageAcc (PrimState m)
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST (PrimState m) ([RawPage], [RawOverflowPage])
forall s.
PageAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s ([RawPage], [RawOverflowPage])
addLargeKeyOp PageAcc (PrimState m)
writerPageAcc SerialisedKey
key Entry SerialisedValue BlobSpan
op'
      -- TODO: consider optimisation described in 'Database.LSMTree.Internal.RunBuilder.addKeyOp'.
      [RawPage] -> (RawPage -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [RawPage]
pages ((RawPage -> m ()) -> m ()) -> (RawPage -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
writeRawPage HasFS m h
writerHasFS ForKOps (ChecksumHandle (PrimState m) h)
writerKOpsHandle
      HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h)
-> [RawOverflowPage]
-> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h)
-> [RawOverflowPage]
-> m ()
writeRawOverflowPages HasFS m h
writerHasFS ForKOps (ChecksumHandle (PrimState m) h)
writerKOpsHandle [RawOverflowPage]
overflowPages

-- | See 'Database.LSMTree.Internal.RunAcc.addSmallKeyOp'.
addSmallKeyOp ::
     PageAcc s
  -> SerialisedKey
  -> Entry SerialisedValue BlobSpan
  -> ST s (Maybe RawPage)
addSmallKeyOp :: forall s.
PageAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s (Maybe RawPage)
addSmallKeyOp PageAcc s
pageAcc SerialisedKey
key Entry SerialisedValue BlobSpan
op =
  Bool -> ST s (Maybe RawPage) -> ST s (Maybe RawPage)
forall a. HasCallStack => Bool -> a -> a
assert (SerialisedKey -> Entry SerialisedValue BlobSpan -> Bool
forall b. SerialisedKey -> Entry SerialisedValue b -> Bool
PageAcc.entryWouldFitInPage SerialisedKey
key Entry SerialisedValue BlobSpan
op) (ST s (Maybe RawPage) -> ST s (Maybe RawPage))
-> ST s (Maybe RawPage) -> ST s (Maybe RawPage)
forall a b. (a -> b) -> a -> b
$ do
    Bool
pageBoundaryNeeded <-
        -- Try adding the key/op to the page accumulator to see if it fits. If
        -- it does not fit, a page boundary is needed.
        Bool -> Bool
not (Bool -> Bool) -> ST s Bool -> ST s Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> PageAcc s
-> SerialisedKey -> Entry SerialisedValue BlobSpan -> ST s Bool
forall s.
PageAcc s
-> SerialisedKey -> Entry SerialisedValue BlobSpan -> ST s Bool
PageAcc.pageAccAddElem PageAcc s
pageAcc SerialisedKey
key Entry SerialisedValue BlobSpan
op
    if Bool
pageBoundaryNeeded
      then do
        -- We need a page boundary. If the current page is empty then we have
        -- a boundary already, otherwise we need to flush the current page.
        Maybe RawPage
mPage <- PageAcc s -> ST s (Maybe RawPage)
forall s. PageAcc s -> ST s (Maybe RawPage)
flushPageIfNonEmpty PageAcc s
pageAcc
        -- The current page is now empty, either because it was already empty
        -- or because we just flushed it. Adding the new key/op to an empty
        -- page must now succeed, because we know it fits in a page.
        Bool
added <- PageAcc s
-> SerialisedKey -> Entry SerialisedValue BlobSpan -> ST s Bool
forall s.
PageAcc s
-> SerialisedKey -> Entry SerialisedValue BlobSpan -> ST s Bool
PageAcc.pageAccAddElem PageAcc s
pageAcc SerialisedKey
key Entry SerialisedValue BlobSpan
op
        Bool -> ST s (Maybe RawPage) -> ST s (Maybe RawPage)
forall a. HasCallStack => Bool -> a -> a
assert Bool
added (ST s (Maybe RawPage) -> ST s (Maybe RawPage))
-> ST s (Maybe RawPage) -> ST s (Maybe RawPage)
forall a b. (a -> b) -> a -> b
$ Maybe RawPage -> ST s (Maybe RawPage)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe RawPage
mPage
      else do
        Maybe RawPage -> ST s (Maybe RawPage)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe RawPage
forall a. Maybe a
Nothing

-- | See 'Database.LSMTree.Internal.RunAcc.addLargeKeyOp'.
addLargeKeyOp ::
     PageAcc s
  -> SerialisedKey
  -> Entry SerialisedValue BlobSpan -- ^ the full value, not just a prefix
  -> ST s ([RawPage], [RawOverflowPage])
addLargeKeyOp :: forall s.
PageAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s ([RawPage], [RawOverflowPage])
addLargeKeyOp PageAcc s
pageAcc SerialisedKey
key Entry SerialisedValue BlobSpan
op =
  Bool
-> ST s ([RawPage], [RawOverflowPage])
-> ST s ([RawPage], [RawOverflowPage])
forall a. HasCallStack => Bool -> a -> a
assert (Bool -> Bool
not (SerialisedKey -> Entry SerialisedValue BlobSpan -> Bool
forall b. SerialisedKey -> Entry SerialisedValue b -> Bool
PageAcc.entryWouldFitInPage SerialisedKey
key Entry SerialisedValue BlobSpan
op)) (ST s ([RawPage], [RawOverflowPage])
 -> ST s ([RawPage], [RawOverflowPage]))
-> ST s ([RawPage], [RawOverflowPage])
-> ST s ([RawPage], [RawOverflowPage])
forall a b. (a -> b) -> a -> b
$ do
    -- If the existing page accumulator is non-empty, we flush it, since the
    -- new large key/op will need more than one page to itself.
    Maybe RawPage
mPagePre <- PageAcc s -> ST s (Maybe RawPage)
forall s. PageAcc s -> ST s (Maybe RawPage)
flushPageIfNonEmpty PageAcc s
pageAcc
    -- Make the new page and overflow pages. Add the span of pages to the index.
    let (RawPage
page, [RawOverflowPage]
overflowPages) = SerialisedKey
-> Entry SerialisedValue BlobSpan -> (RawPage, [RawOverflowPage])
PageAcc.singletonPage SerialisedKey
key Entry SerialisedValue BlobSpan
op
    -- Combine the results with anything we flushed before
    let !pages :: [RawPage]
pages = Maybe RawPage -> RawPage -> [RawPage]
selectPages Maybe RawPage
mPagePre RawPage
page
    ([RawPage], [RawOverflowPage])
-> ST s ([RawPage], [RawOverflowPage])
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ([RawPage]
pages, [RawOverflowPage]
overflowPages)

-- | Internal helper. See 'Database.LSMTree.Internal.RunAcc.flushPageIfNonEmpty'.
flushPageIfNonEmpty :: PageAcc s -> ST s (Maybe RawPage)
flushPageIfNonEmpty :: forall s. PageAcc s -> ST s (Maybe RawPage)
flushPageIfNonEmpty PageAcc s
pageAcc = do
    Int
keysCount <- PageAcc s -> ST s Int
forall s. PageAcc s -> ST s Int
PageAcc.keysCountPageAcc PageAcc s
pageAcc
    if Int
keysCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
      then do
        -- Serialise the page and reset the accumulator
        RawPage
page <- PageAcc s -> ST s RawPage
forall s. PageAcc s -> ST s RawPage
PageAcc.serialisePageAcc PageAcc s
pageAcc
        PageAcc s -> ST s ()
forall s. PageAcc s -> ST s ()
PageAcc.resetPageAcc PageAcc s
pageAcc
        Maybe RawPage -> ST s (Maybe RawPage)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe RawPage -> ST s (Maybe RawPage))
-> Maybe RawPage -> ST s (Maybe RawPage)
forall a b. (a -> b) -> a -> b
$ RawPage -> Maybe RawPage
forall a. a -> Maybe a
Just RawPage
page
      else Maybe RawPage -> ST s (Maybe RawPage)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe RawPage
forall a. Maybe a
Nothing

-- | Internal helper. See 'Database.LSMTree.Internal.RunAcc.selectPagesAndChunks'.
selectPages ::
     Maybe RawPage
  -> RawPage
  -> [RawPage]
selectPages :: Maybe RawPage -> RawPage -> [RawPage]
selectPages Maybe RawPage
mPagePre RawPage
page =
  Maybe RawPage -> [RawPage]
forall a. Maybe a -> [a]
maybeToList Maybe RawPage
mPagePre [RawPage] -> [RawPage] -> [RawPage]
forall a. [a] -> [a] -> [a]
++ [RawPage
page]