{-# OPTIONS_HADDOCK not-home #-}

-- | A mutable run ('RunBuilder') that is under construction.
--
module Database.LSMTree.Internal.RunBuilder (
    RunBuilder (..)
  , RunParams (..)
  , RunDataCaching (..)
  , RunBloomFilterAlloc (..)
  , IndexType (..)
  , new
  , addKeyOp
  , addLargeSerialisedKeyOp
  , unsafeFinalise
  , close
  ) where

import           Control.DeepSeq (NFData (..))
import           Control.Monad (when)
import           Control.Monad.Class.MonadST (MonadST (..))
import qualified Control.Monad.Class.MonadST as ST
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow (MonadThrow)
import           Control.Monad.Primitive
import           Data.BloomFilter (Bloom)
import           Data.Foldable (for_, traverse_)
import           Data.Primitive.PrimVar
import           Data.Word (Word64)
import           Database.LSMTree.Internal.BlobRef (RawBlobRef)
import           Database.LSMTree.Internal.ChecksumHandle
import qualified Database.LSMTree.Internal.CRC32C as CRC
import           Database.LSMTree.Internal.Entry
import           Database.LSMTree.Internal.Index (Index, IndexType (..))
import           Database.LSMTree.Internal.Paths
import           Database.LSMTree.Internal.RawOverflowPage (RawOverflowPage)
import           Database.LSMTree.Internal.RawPage (RawPage)
import           Database.LSMTree.Internal.RunAcc (RunAcc,
                     RunBloomFilterAlloc (..))
import qualified Database.LSMTree.Internal.RunAcc as RunAcc
import           Database.LSMTree.Internal.Serialise
import qualified System.FS.API as FS
import           System.FS.API (HasFS)
import qualified System.FS.BlockIO.API as FS
import           System.FS.BlockIO.API (HasBlockIO)

-- | The in-memory representation of an LSM run that is under construction.
-- (The \"M\" stands for mutable.) This is the output sink for two key
-- algorithms: 1. writing out the write buffer, and 2. incrementally merging
-- two or more runs.
--
-- It contains open file handles for all four files used in the disk
-- representation of a run. Each file handle is opened write-only and should be
-- written to using normal buffered I\/O.
--
-- __Not suitable for concurrent construction from multiple threads!__
--
data RunBuilder m h = RunBuilder {
      forall (m :: * -> *) h. RunBuilder m h -> RunParams
runBuilderParams     :: !RunParams

      -- | The file system paths for all the files used by the run.
    , forall (m :: * -> *) h. RunBuilder m h -> RunFsPaths
runBuilderFsPaths    :: !RunFsPaths

      -- | The run accumulator. This is the representation used for the
      -- morally pure subset of the run cnstruction functionality. In
      -- particular it contains the (mutable) index, bloom filter and buffered
      -- pending output for the key\/ops file.
    , forall (m :: * -> *) h. RunBuilder m h -> RunAcc (PrimState m)
runBuilderAcc        :: !(RunAcc (PrimState m))

      -- | The byte offset within the blob file for the next blob to be written.
    , forall (m :: * -> *) h.
RunBuilder m h -> PrimVar (PrimState m) Word64
runBuilderBlobOffset :: !(PrimVar (PrimState m) Word64)

      -- | The (write mode) file handles.
    , forall (m :: * -> *) h.
RunBuilder m h -> ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles    :: {-# UNPACK #-} !(ForRunFiles (ChecksumHandle (PrimState m) h))
    , forall (m :: * -> *) h. RunBuilder m h -> HasFS m h
runBuilderHasFS      :: !(HasFS m h)
    , forall (m :: * -> *) h. RunBuilder m h -> HasBlockIO m h
runBuilderHasBlockIO :: !(HasBlockIO m h)
    }

data RunParams = RunParams {
       RunParams -> RunDataCaching
runParamCaching :: !RunDataCaching,
       RunParams -> RunBloomFilterAlloc
runParamAlloc   :: !RunBloomFilterAlloc,
       RunParams -> IndexType
runParamIndex   :: !IndexType
     }
  deriving stock (RunParams -> RunParams -> Bool
(RunParams -> RunParams -> Bool)
-> (RunParams -> RunParams -> Bool) -> Eq RunParams
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RunParams -> RunParams -> Bool
== :: RunParams -> RunParams -> Bool
$c/= :: RunParams -> RunParams -> Bool
/= :: RunParams -> RunParams -> Bool
Eq, Int -> RunParams -> ShowS
[RunParams] -> ShowS
RunParams -> String
(Int -> RunParams -> ShowS)
-> (RunParams -> String)
-> ([RunParams] -> ShowS)
-> Show RunParams
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RunParams -> ShowS
showsPrec :: Int -> RunParams -> ShowS
$cshow :: RunParams -> String
show :: RunParams -> String
$cshowList :: [RunParams] -> ShowS
showList :: [RunParams] -> ShowS
Show)

instance NFData RunParams where
  rnf :: RunParams -> ()
rnf (RunParams RunDataCaching
a RunBloomFilterAlloc
b IndexType
c) = RunDataCaching -> ()
forall a. NFData a => a -> ()
rnf RunDataCaching
a () -> () -> ()
forall a b. a -> b -> b
`seq` RunBloomFilterAlloc -> ()
forall a. NFData a => a -> ()
rnf RunBloomFilterAlloc
b () -> () -> ()
forall a b. a -> b -> b
`seq` IndexType -> ()
forall a. NFData a => a -> ()
rnf IndexType
c

-- | Should this run cache key\/ops data in memory?
data RunDataCaching = CacheRunData | NoCacheRunData
  deriving stock (Int -> RunDataCaching -> ShowS
[RunDataCaching] -> ShowS
RunDataCaching -> String
(Int -> RunDataCaching -> ShowS)
-> (RunDataCaching -> String)
-> ([RunDataCaching] -> ShowS)
-> Show RunDataCaching
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RunDataCaching -> ShowS
showsPrec :: Int -> RunDataCaching -> ShowS
$cshow :: RunDataCaching -> String
show :: RunDataCaching -> String
$cshowList :: [RunDataCaching] -> ShowS
showList :: [RunDataCaching] -> ShowS
Show, RunDataCaching -> RunDataCaching -> Bool
(RunDataCaching -> RunDataCaching -> Bool)
-> (RunDataCaching -> RunDataCaching -> Bool) -> Eq RunDataCaching
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RunDataCaching -> RunDataCaching -> Bool
== :: RunDataCaching -> RunDataCaching -> Bool
$c/= :: RunDataCaching -> RunDataCaching -> Bool
/= :: RunDataCaching -> RunDataCaching -> Bool
Eq)

instance NFData RunDataCaching where
  rnf :: RunDataCaching -> ()
rnf RunDataCaching
CacheRunData   = ()
  rnf RunDataCaching
NoCacheRunData = ()

{-# SPECIALISE new ::
     HasFS IO h
  -> HasBlockIO IO h
  -> RunParams
  -> RunFsPaths
  -> NumEntries
  -> IO (RunBuilder IO h) #-}
-- | Create an 'RunBuilder' to start building a run.
--
-- NOTE: 'new' assumes that 'runDir' that the run is created in exists.
new ::
     (MonadST m, MonadSTM m)
  => HasFS m h
  -> HasBlockIO m h
  -> RunParams
  -> RunFsPaths
  -> NumEntries  -- ^ an upper bound of the number of entries to be added
  -> m (RunBuilder m h)
new :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> NumEntries
-> m (RunBuilder m h)
new HasFS m h
hfs HasBlockIO m h
hbio runBuilderParams :: RunParams
runBuilderParams@RunParams{IndexType
RunBloomFilterAlloc
RunDataCaching
runParamCaching :: RunParams -> RunDataCaching
runParamAlloc :: RunParams -> RunBloomFilterAlloc
runParamIndex :: RunParams -> IndexType
runParamCaching :: RunDataCaching
runParamAlloc :: RunBloomFilterAlloc
runParamIndex :: IndexType
..} RunFsPaths
runBuilderFsPaths NumEntries
numEntries = do
    RunAcc (PrimState m)
runBuilderAcc <- ST (PrimState m) (RunAcc (PrimState m)) -> m (RunAcc (PrimState m))
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO (ST (PrimState m) (RunAcc (PrimState m))
 -> m (RunAcc (PrimState m)))
-> ST (PrimState m) (RunAcc (PrimState m))
-> m (RunAcc (PrimState m))
forall a b. (a -> b) -> a -> b
$
                       NumEntries
-> RunBloomFilterAlloc
-> IndexType
-> ST (PrimState m) (RunAcc (PrimState m))
forall s.
NumEntries -> RunBloomFilterAlloc -> IndexType -> ST s (RunAcc s)
RunAcc.new NumEntries
numEntries RunBloomFilterAlloc
runParamAlloc IndexType
runParamIndex
    PrimVar (PrimState m) Word64
runBuilderBlobOffset <- Word64 -> m (PrimVar (PrimState m) Word64)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
a -> m (PrimVar (PrimState m) a)
newPrimVar Word64
0

    ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles <- (FsPath -> m (ChecksumHandle (PrimState m) h))
-> ForRunFiles FsPath
-> m (ForRunFiles (ChecksumHandle (PrimState m) h))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> ForRunFiles a -> f (ForRunFiles b)
traverse (HasFS m h -> FsPath -> m (ChecksumHandle (PrimState m) h)
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h -> FsPath -> m (ChecksumHandle (PrimState m) h)
makeHandle HasFS m h
hfs) (RunFsPaths -> ForRunFiles FsPath
pathsForRunFiles RunFsPaths
runBuilderFsPaths)

    let builder :: RunBuilder m h
builder = RunBuilder { runBuilderHasFS :: HasFS m h
runBuilderHasFS = HasFS m h
hfs, runBuilderHasBlockIO :: HasBlockIO m h
runBuilderHasBlockIO = HasBlockIO m h
hbio, PrimVar (PrimState m) Word64
ForRunFiles (ChecksumHandle (PrimState m) h)
RunFsPaths
RunAcc (PrimState m)
RunParams
runBuilderParams :: RunParams
runBuilderFsPaths :: RunFsPaths
runBuilderAcc :: RunAcc (PrimState m)
runBuilderBlobOffset :: PrimVar (PrimState m) Word64
runBuilderHandles :: ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderParams :: RunParams
runBuilderFsPaths :: RunFsPaths
runBuilderAcc :: RunAcc (PrimState m)
runBuilderBlobOffset :: PrimVar (PrimState m) Word64
runBuilderHandles :: ForRunFiles (ChecksumHandle (PrimState m) h)
.. }
    HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h) -> IndexType -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h) -> IndexType -> m ()
writeIndexHeader HasFS m h
hfs (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForIndex (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForIndex a
forRunIndex ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles) IndexType
runParamIndex
    RunBuilder m h -> m (RunBuilder m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return RunBuilder m h
builder

{-# SPECIALISE addKeyOp ::
     RunBuilder IO h
  -> SerialisedKey
  -> Entry SerialisedValue (RawBlobRef IO h)
  -> IO () #-}
-- | Add a key\/op pair.
--
-- In the 'InsertWithBlob' case, the 'RawBlobRef' identifies where the blob can be
-- found (which is either from a write buffer or another run). The blobs will
-- be copied from their existing blob file into the new run's blob file.
--
-- Use only for entries that are fully in-memory (other than any blob).
-- To handle larger-than-page values in a chunked style during run merging,
-- use 'addLargeSerialisedKeyOp'.
--
-- The k\/ops and the primary array of the index get written incrementally,
-- everything else only at the end when 'unsafeFinalise' is called.
--
addKeyOp ::
     (MonadST m, MonadSTM m, MonadThrow m)
  => RunBuilder m h
  -> SerialisedKey
  -> Entry SerialisedValue (RawBlobRef m h)
  -> m ()
addKeyOp :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadThrow m) =>
RunBuilder m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
addKeyOp RunBuilder{HasFS m h
HasBlockIO m h
PrimVar (PrimState m) Word64
ForRunFiles (ChecksumHandle (PrimState m) h)
RunFsPaths
RunAcc (PrimState m)
RunParams
runBuilderParams :: forall (m :: * -> *) h. RunBuilder m h -> RunParams
runBuilderFsPaths :: forall (m :: * -> *) h. RunBuilder m h -> RunFsPaths
runBuilderAcc :: forall (m :: * -> *) h. RunBuilder m h -> RunAcc (PrimState m)
runBuilderBlobOffset :: forall (m :: * -> *) h.
RunBuilder m h -> PrimVar (PrimState m) Word64
runBuilderHandles :: forall (m :: * -> *) h.
RunBuilder m h -> ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHasFS :: forall (m :: * -> *) h. RunBuilder m h -> HasFS m h
runBuilderHasBlockIO :: forall (m :: * -> *) h. RunBuilder m h -> HasBlockIO m h
runBuilderParams :: RunParams
runBuilderFsPaths :: RunFsPaths
runBuilderAcc :: RunAcc (PrimState m)
runBuilderBlobOffset :: PrimVar (PrimState m) Word64
runBuilderHandles :: ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHasFS :: HasFS m h
runBuilderHasBlockIO :: HasBlockIO m h
..} SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
op = do
    -- TODO: the fmap entry here reallocates even when there are no blobs.
    -- We need the Entry _ BlobSpan for RunAcc.add{Small,Large}KeyOp
    -- Perhaps pass the optional blob span separately from the Entry.
    Entry SerialisedValue BlobSpan
op' <- (RawBlobRef m h -> m BlobSpan)
-> Entry SerialisedValue (RawBlobRef m h)
-> m (Entry SerialisedValue BlobSpan)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b)
-> Entry SerialisedValue a -> f (Entry SerialisedValue b)
traverse (HasFS m h
-> PrimVar (PrimState m) Word64
-> ForBlob (ChecksumHandle (PrimState m) h)
-> RawBlobRef m h
-> m BlobSpan
forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m, PrimMonad m) =>
HasFS m h
-> PrimVar (PrimState m) Word64
-> ForBlob (ChecksumHandle (PrimState m) h)
-> RawBlobRef m h
-> m BlobSpan
copyBlob HasFS m h
runBuilderHasFS PrimVar (PrimState m) Word64
runBuilderBlobOffset (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForBlob (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForBlob a
forRunBlob ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)) Entry SerialisedValue (RawBlobRef m h)
op
    if SerialisedKey -> Entry SerialisedValue BlobSpan -> Bool
forall b. SerialisedKey -> Entry SerialisedValue b -> Bool
RunAcc.entryWouldFitInPage SerialisedKey
key Entry SerialisedValue BlobSpan
op'
      then do
        Maybe (RawPage, Maybe Chunk)
mpagemchunk <- ST (PrimState m) (Maybe (RawPage, Maybe Chunk))
-> m (Maybe (RawPage, Maybe Chunk))
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO (ST (PrimState m) (Maybe (RawPage, Maybe Chunk))
 -> m (Maybe (RawPage, Maybe Chunk)))
-> ST (PrimState m) (Maybe (RawPage, Maybe Chunk))
-> m (Maybe (RawPage, Maybe Chunk))
forall a b. (a -> b) -> a -> b
$ RunAcc (PrimState m)
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST (PrimState m) (Maybe (RawPage, Maybe Chunk))
forall s.
RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s (Maybe (RawPage, Maybe Chunk))
RunAcc.addSmallKeyOp RunAcc (PrimState m)
runBuilderAcc SerialisedKey
key Entry SerialisedValue BlobSpan
op'
        case Maybe (RawPage, Maybe Chunk)
mpagemchunk of
          Maybe (RawPage, Maybe Chunk)
Nothing -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Just (RawPage
page, Maybe Chunk
mchunk) -> do
            HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
writeRawPage HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForKOps (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForKOps a
forRunKOps ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles) RawPage
page
            Maybe Chunk -> (Chunk -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe Chunk
mchunk ((Chunk -> m ()) -> m ()) -> (Chunk -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h) -> Chunk -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h) -> Chunk -> m ()
writeIndexChunk HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForIndex (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForIndex a
forRunIndex ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)

      else do
       ([RawPage]
pages, [RawOverflowPage]
overflowPages, [Chunk]
chunks)
         <- ST (PrimState m) ([RawPage], [RawOverflowPage], [Chunk])
-> m ([RawPage], [RawOverflowPage], [Chunk])
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO (ST (PrimState m) ([RawPage], [RawOverflowPage], [Chunk])
 -> m ([RawPage], [RawOverflowPage], [Chunk]))
-> ST (PrimState m) ([RawPage], [RawOverflowPage], [Chunk])
-> m ([RawPage], [RawOverflowPage], [Chunk])
forall a b. (a -> b) -> a -> b
$ RunAcc (PrimState m)
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST (PrimState m) ([RawPage], [RawOverflowPage], [Chunk])
forall s.
RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
RunAcc.addLargeKeyOp RunAcc (PrimState m)
runBuilderAcc SerialisedKey
key Entry SerialisedValue BlobSpan
op'
       --TODO: consider optimisation: use writev to write all pages in one go
       [RawPage] -> (RawPage -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [RawPage]
pages ((RawPage -> m ()) -> m ()) -> (RawPage -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
writeRawPage HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForKOps (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForKOps a
forRunKOps ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)
       HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h)
-> [RawOverflowPage]
-> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h)
-> [RawOverflowPage]
-> m ()
writeRawOverflowPages HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForKOps (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForKOps a
forRunKOps ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles) [RawOverflowPage]
overflowPages
       [Chunk] -> (Chunk -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Chunk]
chunks ((Chunk -> m ()) -> m ()) -> (Chunk -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h) -> Chunk -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h) -> Chunk -> m ()
writeIndexChunk HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForIndex (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForIndex a
forRunIndex ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)

{-# SPECIALISE addLargeSerialisedKeyOp ::
     RunBuilder IO h
  -> SerialisedKey
  -> RawPage
  -> [RawOverflowPage]
  -> IO () #-}
-- | See 'RunAcc.addLargeSerialisedKeyOp' for details.
--
addLargeSerialisedKeyOp ::
     (MonadST m, MonadSTM m)
  => RunBuilder m h
  -> SerialisedKey
  -> RawPage
  -> [RawOverflowPage]
  -> m ()
addLargeSerialisedKeyOp :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m) =>
RunBuilder m h
-> SerialisedKey -> RawPage -> [RawOverflowPage] -> m ()
addLargeSerialisedKeyOp RunBuilder{HasFS m h
HasBlockIO m h
PrimVar (PrimState m) Word64
ForRunFiles (ChecksumHandle (PrimState m) h)
RunFsPaths
RunAcc (PrimState m)
RunParams
runBuilderParams :: forall (m :: * -> *) h. RunBuilder m h -> RunParams
runBuilderFsPaths :: forall (m :: * -> *) h. RunBuilder m h -> RunFsPaths
runBuilderAcc :: forall (m :: * -> *) h. RunBuilder m h -> RunAcc (PrimState m)
runBuilderBlobOffset :: forall (m :: * -> *) h.
RunBuilder m h -> PrimVar (PrimState m) Word64
runBuilderHandles :: forall (m :: * -> *) h.
RunBuilder m h -> ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHasFS :: forall (m :: * -> *) h. RunBuilder m h -> HasFS m h
runBuilderHasBlockIO :: forall (m :: * -> *) h. RunBuilder m h -> HasBlockIO m h
runBuilderParams :: RunParams
runBuilderFsPaths :: RunFsPaths
runBuilderAcc :: RunAcc (PrimState m)
runBuilderBlobOffset :: PrimVar (PrimState m) Word64
runBuilderHandles :: ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHasFS :: HasFS m h
runBuilderHasBlockIO :: HasBlockIO m h
..} SerialisedKey
key RawPage
page [RawOverflowPage]
overflowPages = do
    ([RawPage]
pages, [RawOverflowPage]
overflowPages', [Chunk]
chunks)
      <- ST (PrimState m) ([RawPage], [RawOverflowPage], [Chunk])
-> m ([RawPage], [RawOverflowPage], [Chunk])
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO (ST (PrimState m) ([RawPage], [RawOverflowPage], [Chunk])
 -> m ([RawPage], [RawOverflowPage], [Chunk]))
-> ST (PrimState m) ([RawPage], [RawOverflowPage], [Chunk])
-> m ([RawPage], [RawOverflowPage], [Chunk])
forall a b. (a -> b) -> a -> b
$
           RunAcc (PrimState m)
-> SerialisedKey
-> RawPage
-> [RawOverflowPage]
-> ST (PrimState m) ([RawPage], [RawOverflowPage], [Chunk])
forall s.
RunAcc s
-> SerialisedKey
-> RawPage
-> [RawOverflowPage]
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
RunAcc.addLargeSerialisedKeyOp RunAcc (PrimState m)
runBuilderAcc SerialisedKey
key RawPage
page [RawOverflowPage]
overflowPages
    [RawPage] -> (RawPage -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [RawPage]
pages ((RawPage -> m ()) -> m ()) -> (RawPage -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
writeRawPage HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForKOps (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForKOps a
forRunKOps ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)
    HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h)
-> [RawOverflowPage]
-> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h)
-> [RawOverflowPage]
-> m ()
writeRawOverflowPages HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForKOps (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForKOps a
forRunKOps ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles) [RawOverflowPage]
overflowPages'
    [Chunk] -> (Chunk -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Chunk]
chunks ((Chunk -> m ()) -> m ()) -> (Chunk -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h) -> Chunk -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h) -> Chunk -> m ()
writeIndexChunk HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForIndex (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForIndex a
forRunIndex ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)

{-# SPECIALISE unsafeFinalise ::
     RunBuilder IO h
  -> IO (HasFS IO h, HasBlockIO IO h,
         RunFsPaths, Bloom SerialisedKey, Index,
         RunParams, NumEntries) #-}
-- | Finish construction of the run.
-- Writes the filter and index to file and leaves all written files on disk.
--
-- __Do not use the 'RunBuilder' after calling this function!__
--
-- TODO: Ensure proper cleanup even in presence of exceptions.
unsafeFinalise ::
     (MonadST m, MonadSTM m, MonadThrow m)
  => RunBuilder m h
  -> m (HasFS m h, HasBlockIO m h,
        RunFsPaths, Bloom SerialisedKey, Index,
        RunParams, NumEntries)
-- TODO: consider introducing a type for this big tuple
unsafeFinalise :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadThrow m) =>
RunBuilder m h
-> m (HasFS m h, HasBlockIO m h, RunFsPaths, Bloom SerialisedKey,
      Index, RunParams, NumEntries)
unsafeFinalise RunBuilder {HasFS m h
HasBlockIO m h
PrimVar (PrimState m) Word64
ForRunFiles (ChecksumHandle (PrimState m) h)
RunFsPaths
RunAcc (PrimState m)
RunParams
runBuilderParams :: forall (m :: * -> *) h. RunBuilder m h -> RunParams
runBuilderFsPaths :: forall (m :: * -> *) h. RunBuilder m h -> RunFsPaths
runBuilderAcc :: forall (m :: * -> *) h. RunBuilder m h -> RunAcc (PrimState m)
runBuilderBlobOffset :: forall (m :: * -> *) h.
RunBuilder m h -> PrimVar (PrimState m) Word64
runBuilderHandles :: forall (m :: * -> *) h.
RunBuilder m h -> ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHasFS :: forall (m :: * -> *) h. RunBuilder m h -> HasFS m h
runBuilderHasBlockIO :: forall (m :: * -> *) h. RunBuilder m h -> HasBlockIO m h
runBuilderParams :: RunParams
runBuilderFsPaths :: RunFsPaths
runBuilderAcc :: RunAcc (PrimState m)
runBuilderBlobOffset :: PrimVar (PrimState m) Word64
runBuilderHandles :: ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHasFS :: HasFS m h
runBuilderHasBlockIO :: HasBlockIO m h
..} = do
    -- write final bits
    (Maybe RawPage
mPage, Maybe Chunk
mChunk, Bloom SerialisedKey
runFilter, Index
runIndex, NumEntries
numEntries) <-
      ST
  (PrimState m)
  (Maybe RawPage, Maybe Chunk, Bloom SerialisedKey, Index,
   NumEntries)
-> m (Maybe RawPage, Maybe Chunk, Bloom SerialisedKey, Index,
      NumEntries)
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO (RunAcc (PrimState m)
-> ST
     (PrimState m)
     (Maybe RawPage, Maybe Chunk, Bloom SerialisedKey, Index,
      NumEntries)
forall s.
RunAcc s
-> ST
     s
     (Maybe RawPage, Maybe Chunk, Bloom SerialisedKey, Index,
      NumEntries)
RunAcc.unsafeFinalise RunAcc (PrimState m)
runBuilderAcc)
    Maybe RawPage -> (RawPage -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe RawPage
mPage ((RawPage -> m ()) -> m ()) -> (RawPage -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForKOps (ChecksumHandle (PrimState m) h) -> RawPage -> m ()
writeRawPage HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForKOps (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForKOps a
forRunKOps ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)
    Maybe Chunk -> (Chunk -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe Chunk
mChunk ((Chunk -> m ()) -> m ()) -> (Chunk -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h) -> Chunk -> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h) -> Chunk -> m ()
writeIndexChunk HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForIndex (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForIndex a
forRunIndex ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)
    HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h)
-> NumEntries
-> Index
-> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForIndex (ChecksumHandle (PrimState m) h)
-> NumEntries
-> Index
-> m ()
writeIndexFinal HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForIndex (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForIndex a
forRunIndex ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles) NumEntries
numEntries Index
runIndex
    HasFS m h
-> ForFilter (ChecksumHandle (PrimState m) h)
-> Bloom SerialisedKey
-> m ()
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m) =>
HasFS m h
-> ForFilter (ChecksumHandle (PrimState m) h)
-> Bloom SerialisedKey
-> m ()
writeFilter HasFS m h
runBuilderHasFS (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ForFilter (ChecksumHandle (PrimState m) h)
forall a. ForRunFiles a -> ForFilter a
forRunFilter ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles) Bloom SerialisedKey
runFilter
    -- write checksums
    ChecksumsFile
checksums <- ForRunFiles CRC32C -> ChecksumsFile
toChecksumsFile (ForRunFiles CRC32C -> ChecksumsFile)
-> m (ForRunFiles CRC32C) -> m ChecksumsFile
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (ChecksumHandle (PrimState m) h -> m CRC32C)
-> ForRunFiles (ChecksumHandle (PrimState m) h)
-> m (ForRunFiles CRC32C)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> ForRunFiles a -> f (ForRunFiles b)
traverse ChecksumHandle (PrimState m) h -> m CRC32C
forall (m :: * -> *) h.
PrimMonad m =>
ChecksumHandle (PrimState m) h -> m CRC32C
readChecksum ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles
    HasFS m h -> FsPath -> OpenMode -> (Handle h -> m ()) -> m ()
forall (m :: * -> *) h a.
(HasCallStack, MonadThrow m) =>
HasFS m h -> FsPath -> OpenMode -> (Handle h -> m a) -> m a
FS.withFile HasFS m h
runBuilderHasFS (RunFsPaths -> FsPath
runChecksumsPath RunFsPaths
runBuilderFsPaths) (AllowExisting -> OpenMode
FS.WriteMode AllowExisting
FS.MustBeNew) ((Handle h -> m ()) -> m ()) -> (Handle h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Handle h
h -> do
      HasFS m h -> Handle h -> ChecksumsFile -> m ()
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> Handle h -> ChecksumsFile -> m ()
CRC.writeChecksumsFile' HasFS m h
runBuilderHasFS Handle h
h ChecksumsFile
checksums
      -- always drop the checksum file from the cache
      HasBlockIO m h -> Handle h -> m ()
forall (m :: * -> *) h. HasBlockIO m h -> Handle h -> m ()
FS.hDropCacheAll HasBlockIO m h
runBuilderHasBlockIO Handle h
h
    -- always drop filter and index files from the cache
    HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
dropCache HasBlockIO m h
runBuilderHasBlockIO (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ChecksumHandle (PrimState m) h
forall a. ForRunFiles a -> a
forRunFilterRaw ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)
    HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
dropCache HasBlockIO m h
runBuilderHasBlockIO (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ChecksumHandle (PrimState m) h
forall a. ForRunFiles a -> a
forRunIndexRaw ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)
    -- drop the KOps and blobs files from the cache if asked for
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (RunParams -> RunDataCaching
runParamCaching RunParams
runBuilderParams RunDataCaching -> RunDataCaching -> Bool
forall a. Eq a => a -> a -> Bool
== RunDataCaching
NoCacheRunData) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
dropCache HasBlockIO m h
runBuilderHasBlockIO (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ChecksumHandle (PrimState m) h
forall a. ForRunFiles a -> a
forRunKOpsRaw ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)
      HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> ChecksumHandle (PrimState m) h -> m ()
dropCache HasBlockIO m h
runBuilderHasBlockIO (ForRunFiles (ChecksumHandle (PrimState m) h)
-> ChecksumHandle (PrimState m) h
forall a. ForRunFiles a -> a
forRunBlobRaw ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles)
    (ChecksumHandle (PrimState m) h -> m ())
-> ForRunFiles (ChecksumHandle (PrimState m) h) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (HasFS m h -> ChecksumHandle (PrimState m) h -> m ()
forall (m :: * -> *) h.
HasFS m h -> ChecksumHandle (PrimState m) h -> m ()
closeHandle HasFS m h
runBuilderHasFS) ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles
    (HasFS m h, HasBlockIO m h, RunFsPaths, Bloom SerialisedKey, Index,
 RunParams, NumEntries)
-> m (HasFS m h, HasBlockIO m h, RunFsPaths, Bloom SerialisedKey,
      Index, RunParams, NumEntries)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (HasFS m h
runBuilderHasFS, HasBlockIO m h
runBuilderHasBlockIO,
            RunFsPaths
runBuilderFsPaths, Bloom SerialisedKey
runFilter, Index
runIndex,
            RunParams
runBuilderParams, NumEntries
numEntries)

{-# SPECIALISE close :: RunBuilder IO h -> IO () #-}
-- | Close a run that is being constructed (has not been finalised yet),
-- removing all files associated with it from disk.
-- After calling this operation, the run must not be used anymore.
--
-- TODO: Ensure proper cleanup even in presence of exceptions.
close :: MonadSTM m => RunBuilder m h -> m ()
close :: forall (m :: * -> *) h. MonadSTM m => RunBuilder m h -> m ()
close RunBuilder {HasFS m h
HasBlockIO m h
PrimVar (PrimState m) Word64
ForRunFiles (ChecksumHandle (PrimState m) h)
RunFsPaths
RunAcc (PrimState m)
RunParams
runBuilderParams :: forall (m :: * -> *) h. RunBuilder m h -> RunParams
runBuilderFsPaths :: forall (m :: * -> *) h. RunBuilder m h -> RunFsPaths
runBuilderAcc :: forall (m :: * -> *) h. RunBuilder m h -> RunAcc (PrimState m)
runBuilderBlobOffset :: forall (m :: * -> *) h.
RunBuilder m h -> PrimVar (PrimState m) Word64
runBuilderHandles :: forall (m :: * -> *) h.
RunBuilder m h -> ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHasFS :: forall (m :: * -> *) h. RunBuilder m h -> HasFS m h
runBuilderHasBlockIO :: forall (m :: * -> *) h. RunBuilder m h -> HasBlockIO m h
runBuilderParams :: RunParams
runBuilderFsPaths :: RunFsPaths
runBuilderAcc :: RunAcc (PrimState m)
runBuilderBlobOffset :: PrimVar (PrimState m) Word64
runBuilderHandles :: ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHasFS :: HasFS m h
runBuilderHasBlockIO :: HasBlockIO m h
..} = do
    (ChecksumHandle (PrimState m) h -> m ())
-> ForRunFiles (ChecksumHandle (PrimState m) h) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (HasFS m h -> ChecksumHandle (PrimState m) h -> m ()
forall (m :: * -> *) h.
HasFS m h -> ChecksumHandle (PrimState m) h -> m ()
closeHandle HasFS m h
runBuilderHasFS) ForRunFiles (ChecksumHandle (PrimState m) h)
runBuilderHandles
    (FsPath -> m ()) -> ForRunFiles FsPath -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeFile HasFS m h
runBuilderHasFS) (RunFsPaths -> ForRunFiles FsPath
pathsForRunFiles RunFsPaths
runBuilderFsPaths)