{-# LANGUAGE CPP           #-}
{-# LANGUAGE MagicHash     #-}
{-# LANGUAGE UnboxedTuples #-}
{-# OPTIONS_HADDOCK not-home #-}

#if !(MIN_VERSION_GLASGOW_HASKELL(9,0,0,0))
-- Fix for ghc 8.10.x with deriving newtype Prim
{-# LANGUAGE DataKinds     #-}
#endif

-- TODO: establish that this implementation matches up with the ScheduledMerges
-- prototype. See lsm-tree#445.
module Database.LSMTree.Internal.MergeSchedule (
    -- * Traces
    AtLevel (..)
  , MergeTrace (..)
    -- * Table content
  , TableContent (..)
  , duplicateTableContent
  , releaseTableContent
    -- * Levels cache
  , LevelsCache (..)
  , mkLevelsCache
    -- * Levels, runs and ongoing merges
  , Levels
  , Level (..)
  , MergePolicyForLevel (..)
  , mergingRunParamsForLevel
    -- * Union level
  , UnionLevel (..)
    -- * Flushes and scheduled merges
  , updatesWithInterleavedFlushes
  , flushWriteBuffer
    -- * Exported for cabal-docspec
  , maxRunSize
    -- * Credits
  , MergeDebt (..)
  , MergeCredits (..)
  , supplyCredits
  , NominalDebt (..)
  , NominalCredits (..)
  , nominalDebtAsCredits
  , nominalDebtForLevel
    -- * Exported for testing
  , addWriteBufferEntries
  ) where

import           Control.ActionRegistry
import           Control.Concurrent.Class.MonadMVar.Strict
import           Control.Monad.Class.MonadST (MonadST)
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..))
import           Control.Monad.Primitive
import           Control.RefCount
import           Control.Tracer
import           Data.BloomFilter (Bloom)
import           Data.Foldable (fold)
import qualified Data.Vector as V
import           Database.LSMTree.Internal.Assertions (assert)
import           Database.LSMTree.Internal.Config
import           Database.LSMTree.Internal.Entry (Entry, NumEntries (..),
                     unNumEntries)
import           Database.LSMTree.Internal.IncomingRun
import           Database.LSMTree.Internal.Index (Index)
import           Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import           Database.LSMTree.Internal.MergingRun (MergeCredits (..),
                     MergeDebt (..), MergingRun, RunParams (..))
import qualified Database.LSMTree.Internal.MergingRun as MR
import           Database.LSMTree.Internal.MergingTree (MergingTree)
import           Database.LSMTree.Internal.Paths (ActiveDir, RunFsPaths (..),
                     SessionRoot)
import qualified Database.LSMTree.Internal.Paths as Paths
import           Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import           Database.LSMTree.Internal.RunNumber
import           Database.LSMTree.Internal.Serialise (SerialisedBlob,
                     SerialisedKey, SerialisedValue)
import           Database.LSMTree.Internal.UniqCounter
import           Database.LSMTree.Internal.Vector (forMStrict, mapStrict)
import           Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import           Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs)
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified System.FS.API as FS
import           System.FS.API (HasFS)
import           System.FS.BlockIO.API (HasBlockIO)

{-------------------------------------------------------------------------------
  Traces
-------------------------------------------------------------------------------}

data AtLevel a = AtLevel LevelNo a
  deriving stock Int -> AtLevel a -> ShowS
[AtLevel a] -> ShowS
AtLevel a -> String
(Int -> AtLevel a -> ShowS)
-> (AtLevel a -> String)
-> ([AtLevel a] -> ShowS)
-> Show (AtLevel a)
forall a. Show a => Int -> AtLevel a -> ShowS
forall a. Show a => [AtLevel a] -> ShowS
forall a. Show a => AtLevel a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> AtLevel a -> ShowS
showsPrec :: Int -> AtLevel a -> ShowS
$cshow :: forall a. Show a => AtLevel a -> String
show :: AtLevel a -> String
$cshowList :: forall a. Show a => [AtLevel a] -> ShowS
showList :: [AtLevel a] -> ShowS
Show

data MergeTrace =
    TraceFlushWriteBuffer
      NumEntries -- ^ Size of the write buffer
      RunNumber
      RunParams
  | TraceAddLevel
  | TraceAddRun
      RunNumber -- ^ newly added run
      (V.Vector RunNumber) -- ^ resident runs
  | TraceNewMerge
      (V.Vector NumEntries) -- ^ Sizes of input runs
      RunNumber
      RunParams
      MergePolicyForLevel
      MR.LevelMergeType
  | TraceNewMergeSingleRun
      NumEntries -- ^ Size of run
      RunNumber
  | TraceCompletedMerge  -- TODO: currently not traced for Incremental merges
      NumEntries -- ^ Size of output run
      RunNumber
    -- | This is traced at the latest point the merge could complete.
  | TraceExpectCompletedMerge
      RunNumber
  deriving stock Int -> MergeTrace -> ShowS
[MergeTrace] -> ShowS
MergeTrace -> String
(Int -> MergeTrace -> ShowS)
-> (MergeTrace -> String)
-> ([MergeTrace] -> ShowS)
-> Show MergeTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MergeTrace -> ShowS
showsPrec :: Int -> MergeTrace -> ShowS
$cshow :: MergeTrace -> String
show :: MergeTrace -> String
$cshowList :: [MergeTrace] -> ShowS
showList :: [MergeTrace] -> ShowS
Show

{-------------------------------------------------------------------------------
  Table content
-------------------------------------------------------------------------------}

-- | The levels of the table, from most to least recently inserted.
--
-- Concurrency: read-only operations are allowed to be concurrent with each
-- other, but update operations must not be concurrent with each other or read
-- operations. For example, inspecting the levels cache can be done
-- concurrently, but 'updatesWithInterleavedFlushes' must be serialised.
--
data TableContent m h = TableContent {
    -- | The in-memory level 0 of the table
    --
    -- TODO: probably less allocation to make this a MutVar
    forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer      :: !WriteBuffer
    -- | The blob storage for entries in the write buffer
  , forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: !(Ref (WriteBufferBlobs m h))
    -- | A hierarchy of \"regular\" on-disk levels numbered 1 and up. Note that
    -- vector index @n@ refers to level @n+1@.
  , forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels           :: !(Levels m h)
    -- | Cache of flattened regular 'levels'.
  , forall (m :: * -> *) h. TableContent m h -> LevelsCache m h
tableCache            :: !(LevelsCache m h)
    -- | An optional final union level, not included in the table cache.
  , forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel       :: !(UnionLevel m h)
  }

{-# SPECIALISE duplicateTableContent :: ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-}
duplicateTableContent ::
     (PrimMonad m, MonadMask m)
  => ActionRegistry m
  -> TableContent m h
  -> m (TableContent m h)
duplicateTableContent :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m (TableContent m h)
duplicateTableContent ActionRegistry m
reg (TableContent WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbb Levels m h
levels LevelsCache m h
cache UnionLevel m h
ul) = do
    Ref (WriteBufferBlobs m h)
wbb'    <- ActionRegistry m
-> m (Ref (WriteBufferBlobs m h))
-> (Ref (WriteBufferBlobs m h) -> m ())
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (WriteBufferBlobs m h)
wbb) Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
    Levels m h
levels' <- ActionRegistry m -> Levels m h -> m (Levels m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (Levels m h)
duplicateLevels ActionRegistry m
reg Levels m h
levels
    LevelsCache m h
cache'  <- ActionRegistry m -> LevelsCache m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LevelsCache m h -> m (LevelsCache m h)
duplicateLevelsCache ActionRegistry m
reg LevelsCache m h
cache
    UnionLevel m h
ul'     <- ActionRegistry m -> UnionLevel m h -> m (UnionLevel m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionLevel m h -> m (UnionLevel m h)
duplicateUnionLevel ActionRegistry m
reg UnionLevel m h
ul
    TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TableContent m h -> m (TableContent m h))
-> TableContent m h -> m (TableContent m h)
forall a b. (a -> b) -> a -> b
$! WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Levels m h
-> LevelsCache m h
-> UnionLevel m h
-> TableContent m h
forall (m :: * -> *) h.
WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Levels m h
-> LevelsCache m h
-> UnionLevel m h
-> TableContent m h
TableContent WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbb' Levels m h
levels' LevelsCache m h
cache' UnionLevel m h
ul'

{-# SPECIALISE releaseTableContent :: ActionRegistry IO -> TableContent IO h -> IO () #-}
releaseTableContent ::
     (PrimMonad m, MonadMask m)
  => ActionRegistry m
  -> TableContent m h
  -> m ()
releaseTableContent :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m ()
releaseTableContent ActionRegistry m
reg (TableContent WriteBuffer
_wb Ref (WriteBufferBlobs m h)
wbb Levels m h
levels LevelsCache m h
cache UnionLevel m h
ul) = do
    ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (WriteBufferBlobs m h)
wbb)
    ActionRegistry m -> Levels m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m ()
releaseLevels ActionRegistry m
reg Levels m h
levels
    ActionRegistry m -> LevelsCache m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LevelsCache m h -> m ()
releaseLevelsCache ActionRegistry m
reg LevelsCache m h
cache
    ActionRegistry m -> UnionLevel m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionLevel m h -> m ()
releaseUnionLevel ActionRegistry m
reg UnionLevel m h
ul

{-------------------------------------------------------------------------------
  Levels cache
-------------------------------------------------------------------------------}

-- | Flattend cache of the runs that referenced by a table.
--
-- This cache includes a vector of runs, but also vectors of the runs broken
-- down into components, like bloom filters, fence pointer indexes and file
-- handles. This allows for quick access in the lookup code. Recomputing this
-- cache should be relatively rare.
--
-- Caches keep references to its runs on construction, and they release each
-- reference when the cache is invalidated. This is done so that incremental
-- merges can remove references for their input runs when a merge completes,
-- without closing runs that might be in use for other operations such as
-- lookups. This does mean that a cache can keep runs open for longer than
-- necessary, so caches should be rebuilt using, e.g., 'rebuildCache', in a
-- timely manner.
data LevelsCache m h = LevelsCache_ {
    forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns      :: !(V.Vector (Ref (Run m h)))
  , forall (m :: * -> *) h.
LevelsCache m h -> Vector (Bloom SerialisedKey)
cachedFilters   :: !(V.Vector (Bloom SerialisedKey))
  , forall (m :: * -> *) h. LevelsCache m h -> Vector Index
cachedIndexes   :: !(V.Vector Index)
  , forall (m :: * -> *) h. LevelsCache m h -> Vector (Handle h)
cachedKOpsFiles :: !(V.Vector (FS.Handle h))
  }

{-# SPECIALISE mkLevelsCache ::
     ActionRegistry IO
  -> Levels IO h
  -> IO (LevelsCache IO h) #-}
-- | Flatten the argument 'Level's into a single vector of runs, including all
-- runs that are inputs to an ongoing merge. Use that to populate the
-- 'LevelsCache'. The cache will take a reference for each of its runs.
mkLevelsCache ::
     forall m h. (PrimMonad m, MonadMVar m, MonadMask m)
  => ActionRegistry m
  -> Levels m h
  -> m (LevelsCache m h)
mkLevelsCache :: forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
mkLevelsCache ActionRegistry m
reg Levels m h
lvls = do
    Vector (Ref (Run m h))
rs <- (Ref (Run m h) -> m (Vector (Ref (Run m h))))
-> (Ref (MergingRun LevelMergeType m h)
    -> m (Vector (Ref (Run m h))))
-> Levels m h
-> m (Vector (Ref (Run m h)))
forall a.
Monoid a =>
(Ref (Run m h) -> m a)
-> (Ref (MergingRun LevelMergeType m h) -> m a)
-> Levels m h
-> m a
foldRunAndMergeM
      ((Ref (Run m h) -> Vector (Ref (Run m h)))
-> m (Ref (Run m h)) -> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Ref (Run m h) -> Vector (Ref (Run m h))
forall a. a -> Vector a
V.singleton (m (Ref (Run m h)) -> m (Vector (Ref (Run m h))))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> Ref (Run m h)
-> m (Vector (Ref (Run m h)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m (Ref (Run m h))
dupRun)
      (\Ref (MergingRun LevelMergeType m h)
mr -> ActionRegistry m
-> m (Vector (Ref (Run m h)))
-> (Vector (Ref (Run m h)) -> m ())
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingRun LevelMergeType m h) -> m (Vector (Ref (Run m h)))
forall (m :: * -> *) t h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Vector (Ref (Run m h)))
MR.duplicateRuns Ref (MergingRun LevelMergeType m h)
mr) ((Ref (Run m h) -> m ()) -> Vector (Ref (Run m h)) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Vector a -> m ()
V.mapM_ Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef))
      Levels m h
lvls
    LevelsCache m h -> m (LevelsCache m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LevelsCache m h -> m (LevelsCache m h))
-> LevelsCache m h -> m (LevelsCache m h)
forall a b. (a -> b) -> a -> b
$! LevelsCache_ {
        cachedRuns :: Vector (Ref (Run m h))
cachedRuns      = Vector (Ref (Run m h))
rs
      , cachedFilters :: Vector (Bloom SerialisedKey)
cachedFilters   = (Ref (Run m h) -> Bloom SerialisedKey)
-> Vector (Ref (Run m h)) -> Vector (Bloom SerialisedKey)
forall a b. (a -> b) -> Vector a -> Vector b
mapStrict (\(DeRef Run m h
r) -> Run m h -> Bloom SerialisedKey
forall (m :: * -> *) h. Run m h -> Bloom SerialisedKey
Run.runFilter   Run m h
r) Vector (Ref (Run m h))
rs
      , cachedIndexes :: Vector Index
cachedIndexes   = (Ref (Run m h) -> Index) -> Vector (Ref (Run m h)) -> Vector Index
forall a b. (a -> b) -> Vector a -> Vector b
mapStrict (\(DeRef Run m h
r) -> Run m h -> Index
forall (m :: * -> *) h. Run m h -> Index
Run.runIndex    Run m h
r) Vector (Ref (Run m h))
rs
      , cachedKOpsFiles :: Vector (Handle h)
cachedKOpsFiles = (Ref (Run m h) -> Handle h)
-> Vector (Ref (Run m h)) -> Vector (Handle h)
forall a b. (a -> b) -> Vector a -> Vector b
mapStrict (\(DeRef Run m h
r) -> Run m h -> Handle h
forall (m :: * -> *) h. Run m h -> Handle h
Run.runKOpsFile Run m h
r) Vector (Ref (Run m h))
rs
      }
  where
    dupRun :: Ref (Run m h) -> m (Ref (Run m h))
dupRun Ref (Run m h)
r = ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef

    -- TODO: this is not terribly performant, but it is also not sure if we are
    -- going to need this in the end. We might get rid of the LevelsCache.
    foldRunAndMergeM ::
         Monoid a
      => (Ref (Run m h) -> m a)
      -> (Ref (MergingRun MR.LevelMergeType m h) -> m a)
      -> Levels m h
      -> m a
    foldRunAndMergeM :: forall a.
Monoid a =>
(Ref (Run m h) -> m a)
-> (Ref (MergingRun LevelMergeType m h) -> m a)
-> Levels m h
-> m a
foldRunAndMergeM Ref (Run m h) -> m a
k1 Ref (MergingRun LevelMergeType m h) -> m a
k2 Levels m h
ls =
        (Vector a -> a) -> m (Vector a) -> m a
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Vector a -> a
forall m. Monoid m => Vector m -> m
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold (m (Vector a) -> m a) -> m (Vector a) -> m a
forall a b. (a -> b) -> a -> b
$ Levels m h -> (Level m h -> m a) -> m (Vector a)
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
forMStrict Levels m h
ls ((Level m h -> m a) -> m (Vector a))
-> (Level m h -> m a) -> m (Vector a)
forall a b. (a -> b) -> a -> b
$ \(Level IncomingRun m h
ir Vector (Ref (Run m h))
rs) -> do
          a
incoming <- case IncomingRun m h
ir of
            Single         Ref (Run m h)
r -> Ref (Run m h) -> m a
k1 Ref (Run m h)
r
            Merging MergePolicyForLevel
_ NominalDebt
_ PrimVar (PrimState m) NominalCredits
_ Ref (MergingRun LevelMergeType m h)
mr -> Ref (MergingRun LevelMergeType m h) -> m a
k2 Ref (MergingRun LevelMergeType m h)
mr
          (a
incoming <>) (a -> a) -> (Vector a -> a) -> Vector a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector a -> a
forall m. Monoid m => Vector m -> m
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold (Vector a -> a) -> m (Vector a) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector (Ref (Run m h)) -> (Ref (Run m h) -> m a) -> m (Vector a)
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
V.forM Vector (Ref (Run m h))
rs Ref (Run m h) -> m a
k1

{-# SPECIALISE rebuildCache ::
     ActionRegistry IO
  -> LevelsCache IO h
  -> Levels IO h
  -> IO (LevelsCache IO h) #-}
-- | Remove references to runs in the old cache, and create a new cache with
-- fresh references taken for the runs in the new levels.
--
-- TODO: caches are currently only rebuilt in flushWriteBuffer. If an
-- OngoingMerge is completed, then tables will only rebuild the cache, and
-- therefore release "old" runs, when a flush is initiated. This is sub-optimal,
-- and there are at least two solutions, but it is unclear which is faster or
-- more convenient.
--
-- * Get rid of the cache entirely, and have each batch of lookups take
--   references for runs in the levels structure.
--
-- * Keep the cache feature, but force a rebuild every once in a while, e.g.,
--   once in every 100 lookups.
--
-- TODO: rebuilding the cache can invalidate blob references if the cache was
-- holding the last reference to a run. This is not really a problem of just the
-- caching approach, but allowing merges to finish early. We should come up with
-- a solution to keep blob references valid until the next /update/ comes along.
-- Lookups should no invalidate blob erferences.
rebuildCache ::
     (PrimMonad m, MonadMVar m, MonadMask m)
  => ActionRegistry m
  -> LevelsCache m h -- ^ old cache
  -> Levels m h -- ^ new levels
  -> m (LevelsCache m h) -- ^ new cache
rebuildCache :: forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m
-> LevelsCache m h -> Levels m h -> m (LevelsCache m h)
rebuildCache ActionRegistry m
reg LevelsCache m h
oldCache Levels m h
newLevels = do
    ActionRegistry m -> LevelsCache m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LevelsCache m h -> m ()
releaseLevelsCache ActionRegistry m
reg LevelsCache m h
oldCache
    ActionRegistry m -> Levels m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
mkLevelsCache ActionRegistry m
reg Levels m h
newLevels

{-# SPECIALISE duplicateLevelsCache ::
     ActionRegistry IO
  -> LevelsCache IO h
  -> IO (LevelsCache IO h) #-}
duplicateLevelsCache ::
     (PrimMonad m, MonadMask m)
  => ActionRegistry m
  -> LevelsCache m h
  -> m (LevelsCache m h)
duplicateLevelsCache :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LevelsCache m h -> m (LevelsCache m h)
duplicateLevelsCache ActionRegistry m
reg LevelsCache m h
cache = do
    Vector (Ref (Run m h))
rs' <- Vector (Ref (Run m h))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
forMStrict (LevelsCache m h -> Vector (Ref (Run m h))
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns LevelsCache m h
cache) ((Ref (Run m h) -> m (Ref (Run m h)))
 -> m (Vector (Ref (Run m h))))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->
             ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
    LevelsCache m h -> m (LevelsCache m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return LevelsCache m h
cache { cachedRuns = rs' }

{-# SPECIALISE releaseLevelsCache ::
     ActionRegistry IO
  -> LevelsCache IO h
  -> IO () #-}
releaseLevelsCache ::
     (PrimMonad m, MonadMask m)
  => ActionRegistry m
  -> LevelsCache m h
  -> m ()
releaseLevelsCache :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LevelsCache m h -> m ()
releaseLevelsCache ActionRegistry m
reg LevelsCache m h
cache =
    Vector (Ref (Run m h)) -> (Ref (Run m h) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ (LevelsCache m h -> Vector (Ref (Run m h))
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns LevelsCache m h
cache) ((Ref (Run m h) -> m ()) -> m ())
-> (Ref (Run m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->
      ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r)

{-------------------------------------------------------------------------------
  Levels
-------------------------------------------------------------------------------}

type Levels m h = V.Vector (Level m h)

-- | A level is a sequence of resident runs at this level, prefixed by an
-- incoming run, which is usually multiple runs that are being merged. Once
-- completed, the resulting run will become a resident run at this level.
data Level m h = Level {
    forall (m :: * -> *) h. Level m h -> IncomingRun m h
incomingRun  :: !(IncomingRun m h)
  , forall (m :: * -> *) h. Level m h -> Vector (Ref (Run m h))
residentRuns :: !(V.Vector (Ref (Run m h)))
  }

{-# SPECIALISE duplicateLevels :: ActionRegistry IO -> Levels IO h -> IO (Levels IO h) #-}
duplicateLevels ::
     (PrimMonad m, MonadMask m)
  => ActionRegistry m
  -> Levels m h
  -> m (Levels m h)
duplicateLevels :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (Levels m h)
duplicateLevels ActionRegistry m
reg Levels m h
levels =
    Levels m h -> (Level m h -> m (Level m h)) -> m (Levels m h)
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
forMStrict Levels m h
levels ((Level m h -> m (Level m h)) -> m (Levels m h))
-> (Level m h -> m (Level m h)) -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$ \Level {IncomingRun m h
incomingRun :: forall (m :: * -> *) h. Level m h -> IncomingRun m h
incomingRun :: IncomingRun m h
incomingRun, Vector (Ref (Run m h))
residentRuns :: forall (m :: * -> *) h. Level m h -> Vector (Ref (Run m h))
residentRuns :: Vector (Ref (Run m h))
residentRuns} -> do
      IncomingRun m h
incomingRun'  <- ActionRegistry m
-> m (IncomingRun m h)
-> (IncomingRun m h -> m ())
-> m (IncomingRun m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (IncomingRun m h -> m (IncomingRun m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
IncomingRun m h -> m (IncomingRun m h)
duplicateIncomingRun IncomingRun m h
incomingRun) IncomingRun m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
IncomingRun m h -> m ()
releaseIncomingRun
      Vector (Ref (Run m h))
residentRuns' <- Vector (Ref (Run m h))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
forMStrict Vector (Ref (Run m h))
residentRuns ((Ref (Run m h) -> m (Ref (Run m h)))
 -> m (Vector (Ref (Run m h))))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->
                         ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
      Level m h -> m (Level m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Level m h -> m (Level m h)) -> Level m h -> m (Level m h)
forall a b. (a -> b) -> a -> b
$! Level {
        incomingRun :: IncomingRun m h
incomingRun  = IncomingRun m h
incomingRun',
        residentRuns :: Vector (Ref (Run m h))
residentRuns = Vector (Ref (Run m h))
residentRuns'
      }

{-# SPECIALISE releaseLevels :: ActionRegistry IO -> Levels IO h -> IO () #-}
releaseLevels ::
     (PrimMonad m, MonadMask m)
  => ActionRegistry m
  -> Levels m h
  -> m ()
releaseLevels :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m ()
releaseLevels ActionRegistry m
reg Levels m h
levels =
    Levels m h -> (Level m h -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Levels m h
levels ((Level m h -> m ()) -> m ()) -> (Level m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Level {IncomingRun m h
incomingRun :: forall (m :: * -> *) h. Level m h -> IncomingRun m h
incomingRun :: IncomingRun m h
incomingRun, Vector (Ref (Run m h))
residentRuns :: forall (m :: * -> *) h. Level m h -> Vector (Ref (Run m h))
residentRuns :: Vector (Ref (Run m h))
residentRuns} -> do
      ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (IncomingRun m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
IncomingRun m h -> m ()
releaseIncomingRun IncomingRun m h
incomingRun)
      (Ref (Run m h) -> m ()) -> Vector (Ref (Run m h)) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Vector a -> m ()
V.mapM_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) Vector (Ref (Run m h))
residentRuns

{-# SPECIALISE iforLevelM_ :: Levels IO h -> (LevelNo -> Level IO h -> IO ()) -> IO () #-}
iforLevelM_ :: Monad m => Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
iforLevelM_ :: forall (m :: * -> *) h.
Monad m =>
Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
iforLevelM_ Levels m h
lvls LevelNo -> Level m h -> m ()
k = Levels m h -> (Int -> Level m h -> m ()) -> m ()
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (Int -> a -> m b) -> m ()
V.iforM_ Levels m h
lvls ((Int -> Level m h -> m ()) -> m ())
-> (Int -> Level m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Int
i Level m h
lvl -> LevelNo -> Level m h -> m ()
k (Int -> LevelNo
LevelNo (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)) Level m h
lvl

{-------------------------------------------------------------------------------
  Union level
-------------------------------------------------------------------------------}

-- | An additional optional last level, created as a result of
-- 'Database.LSMTree.Monoidal.union'. It can not only contain an ongoing merge
-- of multiple runs, but a nested tree of merges.
--
-- TODO: So far, this is
-- * not considered when creating cursors (also used for range lookups)
-- * never merged into the regular levels
data UnionLevel m h =
    NoUnion
  | Union !(Ref (MergingTree m h))

{-# SPECIALISE duplicateUnionLevel ::
     ActionRegistry IO
  -> UnionLevel IO h
  -> IO (UnionLevel IO h) #-}
duplicateUnionLevel ::
     (PrimMonad m, MonadMask m)
  => ActionRegistry m
  -> UnionLevel m h
  -> m (UnionLevel m h)
duplicateUnionLevel :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionLevel m h -> m (UnionLevel m h)
duplicateUnionLevel ActionRegistry m
reg UnionLevel m h
ul =
    case UnionLevel m h
ul of
      UnionLevel m h
NoUnion    -> UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return UnionLevel m h
ul
      Union Ref (MergingTree m h)
tree -> Ref (MergingTree m h) -> UnionLevel m h
forall (m :: * -> *) h. Ref (MergingTree m h) -> UnionLevel m h
Union (Ref (MergingTree m h) -> UnionLevel m h)
-> m (Ref (MergingTree m h)) -> m (UnionLevel m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ActionRegistry m
-> m (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> m ())
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingTree m h) -> m (Ref (MergingTree m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingTree m h)
tree) Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef

{-# SPECIALISE releaseUnionLevel ::
     ActionRegistry IO
  -> UnionLevel IO h
  -> IO () #-}
releaseUnionLevel ::
     (PrimMonad m, MonadMask m)
  => ActionRegistry m
  -> UnionLevel m h
  -> m ()
releaseUnionLevel :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionLevel m h -> m ()
releaseUnionLevel ActionRegistry m
_   UnionLevel m h
NoUnion      = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
releaseUnionLevel ActionRegistry m
reg (Union Ref (MergingTree m h)
tree) = ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingTree m h)
tree)

{-------------------------------------------------------------------------------
  Flushes and scheduled merges
-------------------------------------------------------------------------------}

{-# SPECIALISE updatesWithInterleavedFlushes ::
     Tracer IO (AtLevel MergeTrace)
  -> TableConfig
  -> ResolveSerialisedValue
  -> HasFS IO h
  -> HasBlockIO IO h
  -> SessionRoot
  -> UniqCounter IO
  -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
  -> ActionRegistry IO
  -> TableContent IO h
  -> IO (TableContent IO h) #-}
-- | A single batch of updates can fill up the write buffer multiple times. We
-- flush the write buffer each time it fills up before trying to fill it up
-- again.
--
-- TODO: in practice the size of a batch will be much smaller than the maximum
-- size of the write buffer, so we should optimise for the case that small
-- batches are inserted. Ideas:
--
-- * we can allow a range of sizes to flush to disk rather than just the max size
--
-- * could do a map bulk merge rather than sequential insert, on the prefix of
--   the batch that's guaranteed to fit
--
-- * or flush the existing buffer if we would expect the next batch to cause the
--   buffer to become too large
--
-- TODO: we could also optimise for the case where the write buffer is small
-- compared to the size of the batch, but it is less critical. In particular, in
-- case the write buffer is empty, or if it fills up multiple times for a single
-- batch of updates, we might be able to skip adding entries to the write buffer
-- for a large part. When the write buffer is empty, we can sort and deduplicate
-- the vector of updates directly, slice it up into properly sized sub-vectors,
-- and write those to disk. Of course, any remainder that did not fit into a
-- whole run should then end up in a fresh write buffer.
updatesWithInterleavedFlushes ::
     forall m h.
     (MonadMask m, MonadMVar m, MonadSTM m, MonadST m)
  => Tracer m (AtLevel MergeTrace)
  -> TableConfig
  -> ResolveSerialisedValue
  -> HasFS m h
  -> HasBlockIO m h
  -> SessionRoot
  -> UniqCounter m
  -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
  -> ActionRegistry m
  -> TableContent m h
  -> m (TableContent m h)
updatesWithInterleavedFlushes :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
updatesWithInterleavedFlushes Tracer m (AtLevel MergeTrace)
tr TableConfig
conf ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es ActionRegistry m
reg TableContent m h
tc = do
    let wb :: WriteBuffer
wb = TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tc
        wbblobs :: Ref (WriteBufferBlobs m h)
wbblobs = TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
tc
    (WriteBuffer
wb', Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es') <- HasFS m h
-> ResolveSerialisedValue
-> Ref (WriteBufferBlobs m h)
-> NumEntries
-> WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m, PrimMonad m) =>
HasFS m h
-> ResolveSerialisedValue
-> Ref (WriteBufferBlobs m h)
-> NumEntries
-> WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
addWriteBufferEntries HasFS m h
hfs ResolveSerialisedValue
resolve Ref (WriteBufferBlobs m h)
wbblobs NumEntries
maxn WriteBuffer
wb Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es
    -- Supply credits before flushing, so that we complete merges in time. The
    -- number of supplied credits is based on the size increase of the write
    -- buffer, not the the number of processed entries @length es' - length es@.
    let numAdded :: Int
numAdded = NumEntries -> Int
unNumEntries (WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb') Int -> Int -> Int
forall a. Num a => a -> a -> a
- NumEntries -> Int
unNumEntries (WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb)
    TableConfig -> NominalCredits -> Levels m h -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> NominalCredits -> Levels m h -> m ()
supplyCredits TableConfig
conf (Int -> NominalCredits
NominalCredits Int
numAdded) (TableContent m h -> Levels m h
forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels TableContent m h
tc)
    let tc' :: TableContent m h
tc' = TableContent m h
tc { tableWriteBuffer = wb' }
    if WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb' NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
< NumEntries
maxn then do
      TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TableContent m h -> m (TableContent m h))
-> TableContent m h -> m (TableContent m h)
forall a b. (a -> b) -> a -> b
$! TableContent m h
tc'
    -- If the write buffer did reach capacity, then we flush.
    else do
      TableContent m h
tc'' <- Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
flushWriteBuffer Tracer m (AtLevel MergeTrace)
tr TableConfig
conf ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc ActionRegistry m
reg TableContent m h
tc'
      -- In the fortunate case where we have already performed all the updates,
      -- return,
      if Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Bool
forall a. Vector a -> Bool
V.null Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es' then
        TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TableContent m h -> m (TableContent m h))
-> TableContent m h -> m (TableContent m h)
forall a b. (a -> b) -> a -> b
$! TableContent m h
tc''
      -- otherwise, keep going
      else
        Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
updatesWithInterleavedFlushes Tracer m (AtLevel MergeTrace)
tr TableConfig
conf ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es' ActionRegistry m
reg TableContent m h
tc''
  where
    AllocNumEntries NumEntries
maxn = TableConfig -> WriteBufferAlloc
confWriteBufferAlloc TableConfig
conf

{-# SPECIALISE addWriteBufferEntries ::
     HasFS IO h
  -> ResolveSerialisedValue
  -> Ref (WriteBufferBlobs IO h)
  -> NumEntries
  -> WriteBuffer
  -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
  -> IO (WriteBuffer, V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)) #-}
-- | Add entries to the write buffer up until a certain write buffer size @n@.
--
-- NOTE: if the write buffer is larger @n@ already, this is a no-op.
addWriteBufferEntries ::
     (MonadSTM m, MonadThrow m, PrimMonad m)
  => HasFS m h
  -> ResolveSerialisedValue
  -> Ref (WriteBufferBlobs m h)
  -> NumEntries
  -> WriteBuffer
  -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
  -> m (WriteBuffer, V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
addWriteBufferEntries :: forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m, PrimMonad m) =>
HasFS m h
-> ResolveSerialisedValue
-> Ref (WriteBufferBlobs m h)
-> NumEntries
-> WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
addWriteBufferEntries HasFS m h
hfs ResolveSerialisedValue
f Ref (WriteBufferBlobs m h)
wbblobs NumEntries
maxn =
    \WriteBuffer
wb Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es ->
      (\ r :: (WriteBuffer,
 Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
r@(WriteBuffer
wb', Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es') ->
          -- never exceed the write buffer capacity
          Bool
-> (WriteBuffer,
    Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
    Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a. HasCallStack => Bool -> a -> a
assert (WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb' NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
<= NumEntries
maxn) ((WriteBuffer,
  Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
 -> (WriteBuffer,
     Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)))
-> (WriteBuffer,
    Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
    Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a b. (a -> b) -> a -> b
$
          -- If the new write buffer has not reached capacity yet, then it must
          -- be the case that we have performed all the updates.
          Bool
-> (WriteBuffer,
    Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
    Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a. HasCallStack => Bool -> a -> a
assert ((WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb'  NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
< NumEntries
maxn Bool -> Bool -> Bool
&& Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Bool
forall a. Vector a -> Bool
V.null Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es')
               Bool -> Bool -> Bool
|| (WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb' NumEntries -> NumEntries -> Bool
forall a. Eq a => a -> a -> Bool
== NumEntries
maxn)) ((WriteBuffer,
  Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
 -> (WriteBuffer,
     Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)))
-> (WriteBuffer,
    Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
    Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a b. (a -> b) -> a -> b
$
          (WriteBuffer,
 Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
r)
      ((WriteBuffer,
  Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
 -> (WriteBuffer,
     Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)))
-> m (WriteBuffer,
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> m (WriteBuffer,
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
go WriteBuffer
wb Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es
  where
    --TODO: exception safety for async exceptions or I/O errors from writing blobs
    go :: WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
go !WriteBuffer
wb !Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es
      | WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
>= NumEntries
maxn = (WriteBuffer,
 Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> m (WriteBuffer,
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WriteBuffer
wb, Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es)

      | Just ((SerialisedKey
k, Entry SerialisedValue SerialisedBlob
e), Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es') <- Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Maybe
     ((SerialisedKey, Entry SerialisedValue SerialisedBlob),
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a. Vector a -> Maybe (a, Vector a)
V.uncons Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es = do
          Entry SerialisedValue BlobSpan
e' <- (SerialisedBlob -> m BlobSpan)
-> Entry SerialisedValue SerialisedBlob
-> m (Entry SerialisedValue BlobSpan)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b)
-> Entry SerialisedValue a -> f (Entry SerialisedValue b)
traverse (HasFS m h
-> Ref (WriteBufferBlobs m h) -> SerialisedBlob -> m BlobSpan
forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
HasFS m h
-> Ref (WriteBufferBlobs m h) -> SerialisedBlob -> m BlobSpan
WBB.addBlob HasFS m h
hfs Ref (WriteBufferBlobs m h)
wbblobs) Entry SerialisedValue SerialisedBlob
e
          WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
go (ResolveSerialisedValue
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> WriteBuffer
-> WriteBuffer
WB.addEntry ResolveSerialisedValue
f SerialisedKey
k Entry SerialisedValue BlobSpan
e' WriteBuffer
wb) Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es'

      | Bool
otherwise = (WriteBuffer,
 Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> m (WriteBuffer,
      Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WriteBuffer
wb, Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es)


{-# SPECIALISE flushWriteBuffer ::
     Tracer IO (AtLevel MergeTrace)
  -> TableConfig
  -> ResolveSerialisedValue
  -> HasFS IO h
  -> HasBlockIO IO h
  -> SessionRoot
  -> UniqCounter IO
  -> ActionRegistry IO
  -> TableContent IO h
  -> IO (TableContent IO h) #-}
-- | Flush the write buffer to disk, regardless of whether it is full or not.
--
-- The returned table content contains an updated set of levels, where the write
-- buffer is inserted into level 1.
flushWriteBuffer ::
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => Tracer m (AtLevel MergeTrace)
  -> TableConfig
  -> ResolveSerialisedValue
  -> HasFS m h
  -> HasBlockIO m h
  -> SessionRoot
  -> UniqCounter m
  -> ActionRegistry m
  -> TableContent m h
  -> m (TableContent m h)
flushWriteBuffer :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
flushWriteBuffer Tracer m (AtLevel MergeTrace)
tr TableConfig
conf ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc ActionRegistry m
reg TableContent m h
tc
  | WriteBuffer -> Bool
WB.null (TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tc) = TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent m h
tc
  | Bool
otherwise = do
    !Unique
uniq <- UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter UniqCounter m
uc
    let !size :: NumEntries
size      = WriteBuffer -> NumEntries
WB.numEntries (TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tc)
        !ln :: LevelNo
ln        = Int -> LevelNo
LevelNo Int
1
        (!RunParams
runParams,
         RunFsPaths
runPaths) = ActiveDir
-> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
mergingRunParamsForLevel
                       (SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root) TableConfig
conf Unique
uniq LevelNo
ln

    Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$
      NumEntries -> RunNumber -> RunParams -> MergeTrace
TraceFlushWriteBuffer NumEntries
size (RunFsPaths -> RunNumber
runNumber RunFsPaths
runPaths) RunParams
runParams
    Ref (Run m h)
r <- ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
            (HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
Run.fromWriteBuffer
              HasFS m h
hfs HasBlockIO m h
hbio
              RunParams
runParams RunFsPaths
runPaths
              (TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tc)
              (TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
tc))
            Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
    ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef (TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
tc))
    Ref (WriteBufferBlobs m h)
wbblobs' <- ActionRegistry m
-> m (Ref (WriteBufferBlobs m h))
-> (Ref (WriteBufferBlobs m h) -> m ())
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
WBB.new HasFS m h
hfs (SessionRoot -> Unique -> FsPath
Paths.tableBlobPath SessionRoot
root Unique
uniq))
                                 Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
    Levels m h
levels' <- Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Ref (Run m h)
-> ActionRegistry m
-> Levels m h
-> UnionLevel m h
-> m (Levels m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Ref (Run m h)
-> ActionRegistry m
-> Levels m h
-> UnionLevel m h
-> m (Levels m h)
addRunToLevels Tracer m (AtLevel MergeTrace)
tr TableConfig
conf ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc Ref (Run m h)
r ActionRegistry m
reg
                 (TableContent m h -> Levels m h
forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels TableContent m h
tc)
                 (TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tc)
    LevelsCache m h
tableCache' <- ActionRegistry m
-> LevelsCache m h -> Levels m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m
-> LevelsCache m h -> Levels m h -> m (LevelsCache m h)
rebuildCache ActionRegistry m
reg (TableContent m h -> LevelsCache m h
forall (m :: * -> *) h. TableContent m h -> LevelsCache m h
tableCache TableContent m h
tc) Levels m h
levels'
    TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TableContent m h -> m (TableContent m h))
-> TableContent m h -> m (TableContent m h)
forall a b. (a -> b) -> a -> b
$! TableContent {
        tableWriteBuffer :: WriteBuffer
tableWriteBuffer = WriteBuffer
WB.empty
      , tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs = Ref (WriteBufferBlobs m h)
wbblobs'
      , tableLevels :: Levels m h
tableLevels = Levels m h
levels'
      , tableCache :: LevelsCache m h
tableCache = LevelsCache m h
tableCache'
        -- TODO: move into regular levels if merge completed and size fits
      , tableUnionLevel :: UnionLevel m h
tableUnionLevel = TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tc
      }

{-# SPECIALISE addRunToLevels ::
     Tracer IO (AtLevel MergeTrace)
  -> TableConfig
  -> ResolveSerialisedValue
  -> HasFS IO h
  -> HasBlockIO IO h
  -> SessionRoot
  -> UniqCounter IO
  -> Ref (Run IO h)
  -> ActionRegistry IO
  -> Levels IO h
  -> UnionLevel IO h
  -> IO (Levels IO h) #-}
-- | Add a run to the levels, and propagate merges.
--
-- NOTE: @go@ is based on the @ScheduledMerges.increment@ prototype.
-- See @ScheduledMerges.increment@ for documentation about the merge algorithm.
addRunToLevels ::
     forall m h.
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => Tracer m (AtLevel MergeTrace)
  -> TableConfig
  -> ResolveSerialisedValue
  -> HasFS m h
  -> HasBlockIO m h
  -> SessionRoot
  -> UniqCounter m
  -> Ref (Run m h)
  -> ActionRegistry m
  -> Levels m h
  -> UnionLevel m h
  -> m (Levels m h)
addRunToLevels :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Ref (Run m h)
-> ActionRegistry m
-> Levels m h
-> UnionLevel m h
-> m (Levels m h)
addRunToLevels Tracer m (AtLevel MergeTrace)
tr conf :: TableConfig
conf@TableConfig{MergeSchedule
DiskCachePolicy
FencePointerIndexType
BloomFilterAlloc
WriteBufferAlloc
SizeRatio
MergePolicy
confWriteBufferAlloc :: TableConfig -> WriteBufferAlloc
confMergePolicy :: MergePolicy
confSizeRatio :: SizeRatio
confWriteBufferAlloc :: WriteBufferAlloc
confBloomFilterAlloc :: BloomFilterAlloc
confFencePointerIndex :: FencePointerIndexType
confDiskCachePolicy :: DiskCachePolicy
confMergeSchedule :: MergeSchedule
confMergePolicy :: TableConfig -> MergePolicy
confSizeRatio :: TableConfig -> SizeRatio
confBloomFilterAlloc :: TableConfig -> BloomFilterAlloc
confFencePointerIndex :: TableConfig -> FencePointerIndexType
confDiskCachePolicy :: TableConfig -> DiskCachePolicy
confMergeSchedule :: TableConfig -> MergeSchedule
..} ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc Ref (Run m h)
r0 ActionRegistry m
reg Levels m h
levels UnionLevel m h
ul = do
    LevelNo -> Vector (Ref (Run m h)) -> Levels m h -> m (Levels m h)
go (Int -> LevelNo
LevelNo Int
1) (Ref (Run m h) -> Vector (Ref (Run m h))
forall a. a -> Vector a
V.singleton Ref (Run m h)
r0) Levels m h
levels
  where
    -- NOTE: @go@ is based on the @increment@ function from the
    -- @ScheduledMerges@ prototype.
    --
    -- Releases the vector of runs.
    go ::
         LevelNo
      -> V.Vector (Ref (Run m h))
      -> V.Vector (Level m h )
      -> m (V.Vector (Level m h))
    go :: LevelNo -> Vector (Ref (Run m h)) -> Levels m h -> m (Levels m h)
go !LevelNo
ln Vector (Ref (Run m h))
rs (Levels m h -> Maybe (Level m h, Levels m h)
forall a. Vector a -> Maybe (a, Vector a)
V.uncons -> Maybe (Level m h, Levels m h)
Nothing) = do
        Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln MergeTrace
TraceAddLevel
        -- Make a new level
        let policyForLevel :: MergePolicyForLevel
policyForLevel = MergePolicy
-> LevelNo -> Levels m h -> UnionLevel m h -> MergePolicyForLevel
forall (m :: * -> *) h.
MergePolicy
-> LevelNo -> Levels m h -> UnionLevel m h -> MergePolicyForLevel
mergePolicyForLevel MergePolicy
confMergePolicy LevelNo
ln Levels m h
forall a. Vector a
V.empty UnionLevel m h
ul
        IncomingRun m h
ir <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
policyForLevel LevelMergeType
MR.MergeLastLevel LevelNo
ln Vector (Ref (Run m h))
rs
        Levels m h -> m (Levels m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! Level m h -> Levels m h
forall a. a -> Vector a
V.singleton (Level m h -> Levels m h) -> Level m h -> Levels m h
forall a b. (a -> b) -> a -> b
$ IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir Vector (Ref (Run m h))
forall a. Vector a
V.empty
    go !LevelNo
ln Vector (Ref (Run m h))
rs' (Levels m h -> Maybe (Level m h, Levels m h)
forall a. Vector a -> Maybe (a, Vector a)
V.uncons -> Just (Level IncomingRun m h
ir Vector (Ref (Run m h))
rs, Levels m h
ls)) = do
        Ref (Run m h)
r <- LevelNo -> IncomingRun m h -> m (Ref (Run m h))
expectCompletedMerge LevelNo
ln IncomingRun m h
ir
        case MergePolicy
-> LevelNo -> Levels m h -> UnionLevel m h -> MergePolicyForLevel
forall (m :: * -> *) h.
MergePolicy
-> LevelNo -> Levels m h -> UnionLevel m h -> MergePolicyForLevel
mergePolicyForLevel MergePolicy
confMergePolicy LevelNo
ln Levels m h
ls UnionLevel m h
ul of
          -- If r is still too small for this level then keep it and merge again
          -- with the incoming runs.
          MergePolicyForLevel
LevelTiering | Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
<= TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize' TableConfig
conf MergePolicyForLevel
LevelTiering (LevelNo -> LevelNo
forall a. Enum a => a -> a
pred LevelNo
ln) -> do
            let mergeType :: LevelMergeType
mergeType = Levels m h -> UnionLevel m h -> LevelMergeType
forall (m :: * -> *) h.
Levels m h -> UnionLevel m h -> LevelMergeType
mergeTypeForLevel Levels m h
ls UnionLevel m h
ul
            IncomingRun m h
ir' <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
LevelTiering LevelMergeType
mergeType LevelNo
ln (Vector (Ref (Run m h))
rs' Vector (Ref (Run m h)) -> Ref (Run m h) -> Vector (Ref (Run m h))
forall a. Vector a -> a -> Vector a
`V.snoc` Ref (Run m h)
r)
            Levels m h -> m (Levels m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir' Vector (Ref (Run m h))
rs Level m h -> Levels m h -> Levels m h
forall a. a -> Vector a -> Vector a
`V.cons` Levels m h
ls
          -- This tiering level is now full. We take the completed merged run
          -- (the previous incoming runs), plus all the other runs on this level
          -- as a bundle and move them down to the level below. We start a merge
          -- for the new incoming runs. This level is otherwise empty.
          MergePolicyForLevel
LevelTiering | SizeRatio -> Vector (Ref (Run m h)) -> Bool
forall run. SizeRatio -> Vector run -> Bool
levelIsFull SizeRatio
confSizeRatio Vector (Ref (Run m h))
rs -> do
            IncomingRun m h
ir' <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
LevelTiering LevelMergeType
MR.MergeMidLevel LevelNo
ln Vector (Ref (Run m h))
rs'
            Levels m h
ls' <- LevelNo -> Vector (Ref (Run m h)) -> Levels m h -> m (Levels m h)
go (LevelNo -> LevelNo
forall a. Enum a => a -> a
succ LevelNo
ln) (Ref (Run m h)
r Ref (Run m h) -> Vector (Ref (Run m h)) -> Vector (Ref (Run m h))
forall a. a -> Vector a -> Vector a
`V.cons` Vector (Ref (Run m h))
rs) Levels m h
ls
            Levels m h -> m (Levels m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir' Vector (Ref (Run m h))
forall a. Vector a
V.empty Level m h -> Levels m h -> Levels m h
forall a. a -> Vector a -> Vector a
`V.cons` Levels m h
ls'
          -- This tiering level is not yet full. We move the completed merged run
          -- into the level proper, and start the new merge for the incoming runs.
          MergePolicyForLevel
LevelTiering -> do
            let mergeType :: LevelMergeType
mergeType = Levels m h -> UnionLevel m h -> LevelMergeType
forall (m :: * -> *) h.
Levels m h -> UnionLevel m h -> LevelMergeType
mergeTypeForLevel Levels m h
ls UnionLevel m h
ul
            IncomingRun m h
ir' <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
LevelTiering LevelMergeType
mergeType LevelNo
ln Vector (Ref (Run m h))
rs'
            Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln
                         (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$ RunNumber -> Vector RunNumber -> MergeTrace
TraceAddRun
                            (Ref (Run m h) -> RunNumber
forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
Run.runFsPathsNumber Ref (Run m h)
r)
                            ((Ref (Run m h) -> RunNumber)
-> Vector (Ref (Run m h)) -> Vector RunNumber
forall a b. (a -> b) -> Vector a -> Vector b
V.map Ref (Run m h) -> RunNumber
forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
Run.runFsPathsNumber Vector (Ref (Run m h))
rs)
            Levels m h -> m (Levels m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir' (Ref (Run m h)
r Ref (Run m h) -> Vector (Ref (Run m h)) -> Vector (Ref (Run m h))
forall a. a -> Vector a -> Vector a
`V.cons` Vector (Ref (Run m h))
rs) Level m h -> Levels m h -> Levels m h
forall a. a -> Vector a -> Vector a
`V.cons` Levels m h
ls
          -- The final level is using levelling. If the existing completed merge
          -- run is too large for this level, we promote the run to the next
          -- level and start merging the incoming runs into this (otherwise
          -- empty) level .
          MergePolicyForLevel
LevelLevelling | Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
> TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize' TableConfig
conf MergePolicyForLevel
LevelLevelling LevelNo
ln -> do
            Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Bool
forall a. Vector a -> Bool
V.null Vector (Ref (Run m h))
rs Bool -> Bool -> Bool
&& Levels m h -> Bool
forall a. Vector a -> Bool
V.null Levels m h
ls) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            IncomingRun m h
ir' <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
LevelTiering LevelMergeType
MR.MergeMidLevel LevelNo
ln Vector (Ref (Run m h))
rs'
            Levels m h
ls' <- LevelNo -> Vector (Ref (Run m h)) -> Levels m h -> m (Levels m h)
go (LevelNo -> LevelNo
forall a. Enum a => a -> a
succ LevelNo
ln) (Ref (Run m h) -> Vector (Ref (Run m h))
forall a. a -> Vector a
V.singleton Ref (Run m h)
r) Levels m h
forall a. Vector a
V.empty
            Levels m h -> m (Levels m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir' Vector (Ref (Run m h))
forall a. Vector a
V.empty Level m h -> Levels m h -> Levels m h
forall a. a -> Vector a -> Vector a
`V.cons` Levels m h
ls'
          -- Otherwise we start merging the incoming runs into the run.
          MergePolicyForLevel
LevelLevelling -> do
            Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Bool
forall a. Vector a -> Bool
V.null Vector (Ref (Run m h))
rs Bool -> Bool -> Bool
&& Levels m h -> Bool
forall a. Vector a -> Bool
V.null Levels m h
ls) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            IncomingRun m h
ir' <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
LevelLevelling LevelMergeType
MR.MergeLastLevel LevelNo
ln (Vector (Ref (Run m h))
rs' Vector (Ref (Run m h)) -> Ref (Run m h) -> Vector (Ref (Run m h))
forall a. Vector a -> a -> Vector a
`V.snoc` Ref (Run m h)
r)
            Levels m h -> m (Levels m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir' Vector (Ref (Run m h))
forall a. Vector a
V.empty Level m h -> Levels m h -> Levels m h
forall a. a -> Vector a -> Vector a
`V.cons` Levels m h
forall a. Vector a
V.empty

    -- Releases the incoming run.
    expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h))
    expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h))
expectCompletedMerge LevelNo
ln IncomingRun m h
ir = do
      Ref (Run m h)
r <- case IncomingRun m h
ir of
        Single         Ref (Run m h)
r -> Ref (Run m h) -> m (Ref (Run m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ref (Run m h)
r
        Merging MergePolicyForLevel
_ NominalDebt
_ PrimVar (PrimState m) NominalCredits
_ Ref (MergingRun LevelMergeType m h)
mr -> do
          Ref (Run m h)
r <- ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingRun LevelMergeType m h) -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Ref (Run m h))
MR.expectCompleted Ref (MergingRun LevelMergeType m h)
mr) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
          ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingRun LevelMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun LevelMergeType m h)
mr)
          Ref (Run m h) -> m (Ref (Run m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ref (Run m h)
r
      Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$
        RunNumber -> MergeTrace
TraceExpectCompletedMerge (Ref (Run m h) -> RunNumber
forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
Run.runFsPathsNumber Ref (Run m h)
r)
      Ref (Run m h) -> m (Ref (Run m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ref (Run m h)
r

    -- Consumes and releases the runs.
    newMerge :: MergePolicyForLevel
             -> MR.LevelMergeType
             -> LevelNo
             -> V.Vector (Ref (Run m h))
             -> m (IncomingRun m h)
    newMerge :: MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
mergePolicy LevelMergeType
mergeType LevelNo
ln Vector (Ref (Run m h))
rs = do
      IncomingRun m h
ir <- ActionRegistry m
-> m (IncomingRun m h)
-> (IncomingRun m h -> m ())
-> m (IncomingRun m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
              (Tracer m (AtLevel MergeTrace)
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newIncomingRunAtLevel Tracer m (AtLevel MergeTrace)
tr HasFS m h
hfs HasBlockIO m h
hbio
                                     SessionRoot
root UniqCounter m
uc TableConfig
conf ResolveSerialisedValue
resolve
                                     MergePolicyForLevel
mergePolicy LevelMergeType
mergeType LevelNo
ln Vector (Ref (Run m h))
rs)
              IncomingRun m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
IncomingRun m h -> m ()
releaseIncomingRun
      -- The runs will end up inside the incoming/merging run, with fresh
      -- references (since newIncoming* will make duplicates).
      -- The original references must be released (but only on the happy path).
      Vector (Ref (Run m h)) -> (Ref (Run m h) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (Ref (Run m h))
rs ((Ref (Run m h) -> m ()) -> m ())
-> (Ref (Run m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r -> ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r)
      case MergeSchedule
confMergeSchedule of
        MergeSchedule
Incremental -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        MergeSchedule
OneShot     ->
          m (Ref (Run m h))
-> (Ref (Run m h) -> m ()) -> (Ref (Run m h) -> m ()) -> m ()
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
            (TableConfig -> LevelNo -> IncomingRun m h -> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> LevelNo -> IncomingRun m h -> m (Ref (Run m h))
immediatelyCompleteIncomingRun TableConfig
conf LevelNo
ln IncomingRun m h
ir)
            Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef ((Ref (Run m h) -> m ()) -> m ())
-> (Ref (Run m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->

            Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$
              NumEntries -> RunNumber -> MergeTrace
TraceCompletedMerge (Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r) (Ref (Run m h) -> RunNumber
forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
Run.runFsPathsNumber Ref (Run m h)
r)

      IncomingRun m h -> m (IncomingRun m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return IncomingRun m h
ir

{-# SPECIALISE newIncomingRunAtLevel ::
     Tracer IO (AtLevel MergeTrace)
  -> HasFS IO h
  -> HasBlockIO IO h
  -> SessionRoot
  -> UniqCounter IO
  -> TableConfig
  -> ResolveSerialisedValue
  -> MergePolicyForLevel
  -> MR.LevelMergeType
  -> LevelNo
  -> V.Vector (Ref (Run IO h))
  -> IO (IncomingRun IO h) #-}
newIncomingRunAtLevel ::
     (MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
  => Tracer m (AtLevel MergeTrace)
  -> HasFS m h
  -> HasBlockIO m h
  -> SessionRoot
  -> UniqCounter m
  -> TableConfig
  -> ResolveSerialisedValue
  -> MergePolicyForLevel
  -> MR.LevelMergeType
  -> LevelNo
  -> V.Vector (Ref (Run m h))
  -> m (IncomingRun m h)
newIncomingRunAtLevel :: forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newIncomingRunAtLevel Tracer m (AtLevel MergeTrace)
tr HasFS m h
hfs HasBlockIO m h
hbio
                      SessionRoot
root UniqCounter m
uc TableConfig
conf ResolveSerialisedValue
resolve
                      MergePolicyForLevel
mergePolicy LevelMergeType
mergeType LevelNo
ln Vector (Ref (Run m h))
rs
  | Just (Ref (Run m h)
r, Vector (Ref (Run m h))
rest) <- Vector (Ref (Run m h))
-> Maybe (Ref (Run m h), Vector (Ref (Run m h)))
forall a. Vector a -> Maybe (a, Vector a)
V.uncons Vector (Ref (Run m h))
rs, Vector (Ref (Run m h)) -> Bool
forall a. Vector a -> Bool
V.null Vector (Ref (Run m h))
rest = do

    Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$
      NumEntries -> RunNumber -> MergeTrace
TraceNewMergeSingleRun (Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r) (Ref (Run m h) -> RunNumber
forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
Run.runFsPathsNumber Ref (Run m h)
r)

    Ref (Run m h) -> m (IncomingRun m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
Ref (Run m h) -> m (IncomingRun m h)
newIncomingSingleRun Ref (Run m h)
r

  | Bool
otherwise = do

    Unique
uniq <- UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter UniqCounter m
uc
    let (!RunParams
runParams, !RunFsPaths
runPaths) =
          ActiveDir
-> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
mergingRunParamsForLevel (SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root) TableConfig
conf Unique
uniq LevelNo
ln

    Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$
      Vector NumEntries
-> RunNumber
-> RunParams
-> MergePolicyForLevel
-> LevelMergeType
-> MergeTrace
TraceNewMerge ((Ref (Run m h) -> NumEntries)
-> Vector (Ref (Run m h)) -> Vector NumEntries
forall a b. (a -> b) -> Vector a -> Vector b
V.map Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Vector (Ref (Run m h))
rs) (RunFsPaths -> RunNumber
runNumber RunFsPaths
runPaths)
                    RunParams
runParams MergePolicyForLevel
mergePolicy LevelMergeType
mergeType

    m (Ref (MergingRun LevelMergeType m h))
-> (Ref (MergingRun LevelMergeType m h) -> m ())
-> (Ref (MergingRun LevelMergeType m h) -> m (IncomingRun m h))
-> m (IncomingRun m h)
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
      (HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> LevelMergeType
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Ref (MergingRun LevelMergeType m h))
forall t (m :: * -> *) h.
(IsMergeType t, MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> t
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Ref (MergingRun t m h))
MR.new HasFS m h
hfs HasBlockIO m h
hbio ResolveSerialisedValue
resolve RunParams
runParams LevelMergeType
mergeType RunFsPaths
runPaths Vector (Ref (Run m h))
rs)
      Ref (MergingRun LevelMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef ((Ref (MergingRun LevelMergeType m h) -> m (IncomingRun m h))
 -> m (IncomingRun m h))
-> (Ref (MergingRun LevelMergeType m h) -> m (IncomingRun m h))
-> m (IncomingRun m h)
forall a b. (a -> b) -> a -> b
$ \Ref (MergingRun LevelMergeType m h)
mr ->
        Bool -> m (IncomingRun m h) -> m (IncomingRun m h)
forall a. HasCallStack => Bool -> a -> a
assert (Ref (MergingRun LevelMergeType m h) -> MergeDebt
forall t (m :: * -> *) h. Ref (MergingRun t m h) -> MergeDebt
MR.totalMergeDebt Ref (MergingRun LevelMergeType m h)
mr MergeDebt -> MergeDebt -> Bool
forall a. Ord a => a -> a -> Bool
<= TableConfig -> MergePolicyForLevel -> LevelNo -> MergeDebt
maxMergeDebt TableConfig
conf MergePolicyForLevel
mergePolicy LevelNo
ln) (m (IncomingRun m h) -> m (IncomingRun m h))
-> m (IncomingRun m h) -> m (IncomingRun m h)
forall a b. (a -> b) -> a -> b
$
        let nominalDebt :: NominalDebt
nominalDebt = TableConfig -> LevelNo -> NominalDebt
nominalDebtForLevel TableConfig
conf LevelNo
ln in
        MergePolicyForLevel
-> NominalDebt
-> Ref (MergingRun LevelMergeType m h)
-> m (IncomingRun m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
MergePolicyForLevel
-> NominalDebt
-> Ref (MergingRun LevelMergeType m h)
-> m (IncomingRun m h)
newIncomingMergingRun MergePolicyForLevel
mergePolicy NominalDebt
nominalDebt Ref (MergingRun LevelMergeType m h)
mr

mergingRunParamsForLevel ::
     ActiveDir
  -> TableConfig
  -> Unique
  -> LevelNo
  -> (RunParams, RunFsPaths)
mergingRunParamsForLevel :: ActiveDir
-> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
mergingRunParamsForLevel ActiveDir
dir TableConfig
conf Unique
unique LevelNo
ln =
    (TableConfig -> RunLevelNo -> RunParams
runParamsForLevel TableConfig
conf (LevelNo -> RunLevelNo
RegularLevel LevelNo
ln), RunFsPaths
runPaths)
  where
    !runPaths :: RunFsPaths
runPaths = ActiveDir -> RunNumber -> RunFsPaths
Paths.runPath ActiveDir
dir (Unique -> RunNumber
uniqueToRunNumber Unique
unique)

-- | We use levelling on the last level, unless that is also the first level.
mergePolicyForLevel ::
     MergePolicy
  -> LevelNo
  -> Levels m h
  -> UnionLevel m h
  -> MergePolicyForLevel
mergePolicyForLevel :: forall (m :: * -> *) h.
MergePolicy
-> LevelNo -> Levels m h -> UnionLevel m h -> MergePolicyForLevel
mergePolicyForLevel MergePolicy
MergePolicyLazyLevelling (LevelNo Int
n) Levels m h
nextLevels UnionLevel m h
unionLevel
  | Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
  = MergePolicyForLevel
LevelTiering    -- always use tiering on first level

  | Levels m h -> Bool
forall a. Vector a -> Bool
V.null Levels m h
nextLevels
  , UnionLevel m h
NoUnion <- UnionLevel m h
unionLevel
  = MergePolicyForLevel
LevelLevelling  -- levelling on last level

  | Bool
otherwise
  = MergePolicyForLevel
LevelTiering

-- $setup
-- >>> import Database.LSMTree.Internal.Entry
-- >>> import Database.LSMTree.Internal.Config

-- | Compute the maximum size of a run for a given level.
--
-- The @size@ of a tiering run at each level is allowed to be
-- @bufferSize*sizeRatio^(level-1) < size <= bufferSize*sizeRatio^level@.
--
-- >>> unNumEntries . maxRunSize Four (AllocNumEntries (NumEntries 2)) LevelTiering . LevelNo <$> [0, 1, 2, 3, 4]
-- [0,2,8,32,128]
--
-- The @size@ of a levelling run at each level is allowed to be
-- @bufferSize*sizeRatio^(level-1) < size <= bufferSize*sizeRatio^(level+1)@. A
-- levelling run can take take up a whole level, so the maximum size of a run is
-- @sizeRatio*@ larger than the maximum size of a tiering run on the same level.
--
-- >>> unNumEntries . maxRunSize Four (AllocNumEntries (NumEntries 2)) LevelLevelling . LevelNo <$> [0, 1, 2, 3, 4]
-- [0,8,32,128,512]
maxRunSize ::
     SizeRatio
  -> WriteBufferAlloc
  -> MergePolicyForLevel
  -> LevelNo
  -> NumEntries
maxRunSize :: SizeRatio
-> WriteBufferAlloc -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize SizeRatio
_ WriteBufferAlloc
_ MergePolicyForLevel
_ (LevelNo Int
ln)
  | Int
ln Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0  = String -> NumEntries
forall a. HasCallStack => String -> a
error String
"maxRunSize: non-positive level number"
  | Int
ln Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = Int -> NumEntries
NumEntries Int
0

maxRunSize SizeRatio
sizeRatio (AllocNumEntries NumEntries
bufferSize) MergePolicyForLevel
LevelTiering LevelNo
ln =
    Int -> NumEntries
NumEntries (Int -> NumEntries) -> Int -> NumEntries
forall a b. (a -> b) -> a -> b
$ Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering (SizeRatio -> Int
sizeRatioInt SizeRatio
sizeRatio) NumEntries
bufferSize LevelNo
ln

maxRunSize SizeRatio
sizeRatio (AllocNumEntries NumEntries
bufferSize) MergePolicyForLevel
LevelLevelling LevelNo
ln =
    Int -> NumEntries
NumEntries (Int -> NumEntries) -> Int -> NumEntries
forall a b. (a -> b) -> a -> b
$ Int -> NumEntries -> LevelNo -> Int
maxRunSizeLevelling (SizeRatio -> Int
sizeRatioInt SizeRatio
sizeRatio) NumEntries
bufferSize LevelNo
ln

maxRunSizeTiering, maxRunSizeLevelling :: Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering :: Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering Int
sizeRatio (NumEntries Int
bufferSize) (LevelNo Int
ln) =
    Int
bufferSize Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
sizeRatio Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ Int -> Int
forall a. Enum a => a -> a
pred Int
ln

maxRunSizeLevelling :: Int -> NumEntries -> LevelNo -> Int
maxRunSizeLevelling Int
sizeRatio NumEntries
bufferSize LevelNo
ln =
    Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering Int
sizeRatio NumEntries
bufferSize (LevelNo -> LevelNo
forall a. Enum a => a -> a
succ LevelNo
ln)

maxRunSize' :: TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize' :: TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize' TableConfig
config MergePolicyForLevel
policy LevelNo
ln =
    SizeRatio
-> WriteBufferAlloc -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize (TableConfig -> SizeRatio
confSizeRatio TableConfig
config) (TableConfig -> WriteBufferAlloc
confWriteBufferAlloc TableConfig
config) MergePolicyForLevel
policy LevelNo
ln

-- | If there are no further levels provided, this level is the last one.
-- However, if a 'Union' is present, it acts as another (last) level.
mergeTypeForLevel :: Levels m h -> UnionLevel m h -> MR.LevelMergeType
mergeTypeForLevel :: forall (m :: * -> *) h.
Levels m h -> UnionLevel m h -> LevelMergeType
mergeTypeForLevel Levels m h
levels UnionLevel m h
unionLevel
  | Levels m h -> Bool
forall a. Vector a -> Bool
V.null Levels m h
levels, UnionLevel m h
NoUnion <- UnionLevel m h
unionLevel = LevelMergeType
MR.MergeLastLevel
  | Bool
otherwise                            = LevelMergeType
MR.MergeMidLevel

levelIsFull :: SizeRatio -> V.Vector run -> Bool
levelIsFull :: forall run. SizeRatio -> Vector run -> Bool
levelIsFull SizeRatio
sr Vector run
rs = Vector run -> Int
forall a. Vector a -> Int
V.length Vector run
rs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= (SizeRatio -> Int
sizeRatioInt SizeRatio
sr)

{-------------------------------------------------------------------------------
  Credits
-------------------------------------------------------------------------------}

{-
  Note [Credits]
~~~~~~~~~~~~~~

  With scheduled merges, each update (e.g., insert) on a table contributes to
  the progression of ongoing merges in the levels structure. This ensures that
  merges are finished in time before a new merge has to be started. The points
  in the evolution of the levels structure where new merges are started are
  known: a flush of a full write buffer will create a new run on the first
  level, and after sufficient flushes (e.g., 4) we will start at least one new
  merge on the second level. This may cascade down to lower levels depending on
  how full the levels are. As such, we have a well-defined measure to determine
  when merges should be finished: it only depends on the maximum size of the
  write buffer!

  The simplest solution to making sure merges are done in time is to step them
  to completion immediately when started. This does not, however, spread out
  work over time nicely. Instead, we schedule merge work based on how many
  updates are made on the table, taking care to ensure that the merge is
  finished /just/ in time before the next flush comes around, and not too early.

  TODO: we can still spread out work more evenly over time. We are finishing
  some merges too early, for example. See 'creditsForMerge'.

  The progression is tracked using merge credits, where each single update
  contributes a single credit to each ongoing merge. This is equivalent to
  saying we contribute a credit to each level, since each level contains
  precisely one ongoing merge. Contributing a credit does not, however,
  translate directly to doing one /unit/ of merging work:

  * The amount of work to do for one credit is adjusted depending on the type of
    merge we are doing. Last-level merges, for example, can have larger inputs,
    and therefore we have to do a little more work for each credit. As such, we
    /scale/ credits for the specific type of merge.

  * Unspent credits are accumulated until they go over a threshold, after which
    a batch of merge work will be performed. Configuring this threshold should
    allow to achieve a nice balance between spreading out I/O and achieving good
    (concurrent) performance.

  Merging runs can be shared across tables, which means that multiple threads
  can contribute to the same merge concurrently.
-}

{-# SPECIALISE supplyCredits ::
     TableConfig
  -> NominalCredits
  -> Levels IO h
  -> IO ()
  #-}
-- | Supply the given amount of credits to each merge in the levels structure.
-- This /may/ cause some merges to progress.
supplyCredits ::
     (MonadSTM m, MonadST m, MonadMVar m, MonadMask m)
  => TableConfig
  -> NominalCredits
  -> Levels m h
  -> m ()
supplyCredits :: forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> NominalCredits -> Levels m h -> m ()
supplyCredits TableConfig
conf NominalCredits
deposit Levels m h
levels =
    Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
forall (m :: * -> *) h.
Monad m =>
Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
iforLevelM_ Levels m h
levels ((LevelNo -> Level m h -> m ()) -> m ())
-> (LevelNo -> Level m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \LevelNo
ln (Level IncomingRun m h
ir Vector (Ref (Run m h))
_rs) ->
      TableConfig -> LevelNo -> IncomingRun m h -> NominalCredits -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> LevelNo -> IncomingRun m h -> NominalCredits -> m ()
supplyCreditsIncomingRun TableConfig
conf LevelNo
ln IncomingRun m h
ir NominalCredits
deposit
      --TODO: consider tracing supply of credits,
      -- supplyCreditsIncomingRun could easily return the supplied credits
      -- before & after, which may be useful for tracing.

maxMergeDebt :: TableConfig -> MergePolicyForLevel -> LevelNo -> MergeDebt
maxMergeDebt :: TableConfig -> MergePolicyForLevel -> LevelNo -> MergeDebt
maxMergeDebt TableConfig {
               confWriteBufferAlloc :: TableConfig -> WriteBufferAlloc
confWriteBufferAlloc = AllocNumEntries NumEntries
bufferSize,
               SizeRatio
confSizeRatio :: TableConfig -> SizeRatio
confSizeRatio :: SizeRatio
confSizeRatio
             } MergePolicyForLevel
mergePolicy LevelNo
ln =
    let !sizeRatio :: Int
sizeRatio = SizeRatio -> Int
sizeRatioInt SizeRatio
confSizeRatio in
    case MergePolicyForLevel
mergePolicy of
      MergePolicyForLevel
LevelLevelling ->
        MergeCredits -> MergeDebt
MergeDebt (MergeCredits -> MergeDebt)
-> (Int -> MergeCredits) -> Int -> MergeDebt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> MergeCredits
MergeCredits (Int -> MergeDebt) -> Int -> MergeDebt
forall a b. (a -> b) -> a -> b
$
          Int
sizeRatio Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering Int
sizeRatio NumEntries
bufferSize (LevelNo -> LevelNo
forall a. Enum a => a -> a
pred LevelNo
ln)
                    Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int -> NumEntries -> LevelNo -> Int
maxRunSizeLevelling Int
sizeRatio NumEntries
bufferSize LevelNo
ln

      MergePolicyForLevel
LevelTiering   ->
        MergeCredits -> MergeDebt
MergeDebt (MergeCredits -> MergeDebt)
-> (Int -> MergeCredits) -> Int -> MergeDebt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> MergeCredits
MergeCredits (Int -> MergeDebt) -> Int -> MergeDebt
forall a b. (a -> b) -> a -> b
$
          Int
maxRuns Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering Int
sizeRatio NumEntries
bufferSize (LevelNo -> LevelNo
forall a. Enum a => a -> a
pred LevelNo
ln)
        where
          -- We can hold back underfull runs, so sometimes the are n+1 runs,
          -- rather than the typical n at a tiering level (n = LSM size ratio).
          maxRuns :: Int
maxRuns = Int
sizeRatio Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1

-- | The nominal debt equals the minimum of credits we will supply before we
-- expect the merge to complete. This is the same as the number of updates
-- in a run that gets moved to this level.
nominalDebtForLevel :: TableConfig -> LevelNo -> NominalDebt
nominalDebtForLevel :: TableConfig -> LevelNo -> NominalDebt
nominalDebtForLevel TableConfig {
                      confWriteBufferAlloc :: TableConfig -> WriteBufferAlloc
confWriteBufferAlloc = AllocNumEntries !NumEntries
bufferSize,
                      SizeRatio
confSizeRatio :: TableConfig -> SizeRatio
confSizeRatio :: SizeRatio
confSizeRatio
                    } LevelNo
ln =
    Int -> NominalDebt
NominalDebt (Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering (SizeRatio -> Int
sizeRatioInt SizeRatio
confSizeRatio) NumEntries
bufferSize LevelNo
ln)