{-# LANGUAGE CPP #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}
{-# OPTIONS_HADDOCK not-home #-}
#if !(MIN_VERSION_GLASGOW_HASKELL(9,0,0,0))
{-# LANGUAGE DataKinds #-}
#endif
module Database.LSMTree.Internal.MergeSchedule (
AtLevel (..)
, MergeTrace (..)
, TableContent (..)
, duplicateTableContent
, releaseTableContent
, LevelsCache (..)
, mkLevelsCache
, Levels
, Level (..)
, MergePolicyForLevel (..)
, mergingRunParamsForLevel
, UnionLevel (..)
, updatesWithInterleavedFlushes
, flushWriteBuffer
, maxRunSize
, MergeDebt (..)
, MergeCredits (..)
, supplyCredits
, NominalDebt (..)
, NominalCredits (..)
, nominalDebtAsCredits
, nominalDebtForLevel
, addWriteBufferEntries
) where
import Control.ActionRegistry
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadSTM (MonadSTM (..))
import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..))
import Control.Monad.Primitive
import Control.RefCount
import Control.Tracer
import Data.BloomFilter (Bloom)
import Data.Foldable (fold)
import qualified Data.Vector as V
import Database.LSMTree.Internal.Assertions (assert)
import Database.LSMTree.Internal.Config
import Database.LSMTree.Internal.Entry (Entry, NumEntries (..),
unNumEntries)
import Database.LSMTree.Internal.IncomingRun
import Database.LSMTree.Internal.Index (Index)
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import Database.LSMTree.Internal.MergingRun (MergeCredits (..),
MergeDebt (..), MergingRun, RunParams (..))
import qualified Database.LSMTree.Internal.MergingRun as MR
import Database.LSMTree.Internal.MergingTree (MergingTree)
import Database.LSMTree.Internal.Paths (ActiveDir, RunFsPaths (..),
SessionRoot)
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise (SerialisedBlob,
SerialisedKey, SerialisedValue)
import Database.LSMTree.Internal.UniqCounter
import Database.LSMTree.Internal.Vector (forMStrict, mapStrict)
import Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs)
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified System.FS.API as FS
import System.FS.API (HasFS)
import System.FS.BlockIO.API (HasBlockIO)
data AtLevel a = AtLevel LevelNo a
deriving stock Int -> AtLevel a -> ShowS
[AtLevel a] -> ShowS
AtLevel a -> String
(Int -> AtLevel a -> ShowS)
-> (AtLevel a -> String)
-> ([AtLevel a] -> ShowS)
-> Show (AtLevel a)
forall a. Show a => Int -> AtLevel a -> ShowS
forall a. Show a => [AtLevel a] -> ShowS
forall a. Show a => AtLevel a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> AtLevel a -> ShowS
showsPrec :: Int -> AtLevel a -> ShowS
$cshow :: forall a. Show a => AtLevel a -> String
show :: AtLevel a -> String
$cshowList :: forall a. Show a => [AtLevel a] -> ShowS
showList :: [AtLevel a] -> ShowS
Show
data MergeTrace =
TraceFlushWriteBuffer
NumEntries
RunNumber
RunParams
| TraceAddLevel
| TraceAddRun
RunNumber
(V.Vector RunNumber)
| TraceNewMerge
(V.Vector NumEntries)
RunNumber
RunParams
MergePolicyForLevel
MR.LevelMergeType
| TraceNewMergeSingleRun
NumEntries
RunNumber
| TraceCompletedMerge
NumEntries
RunNumber
| TraceExpectCompletedMerge
RunNumber
deriving stock Int -> MergeTrace -> ShowS
[MergeTrace] -> ShowS
MergeTrace -> String
(Int -> MergeTrace -> ShowS)
-> (MergeTrace -> String)
-> ([MergeTrace] -> ShowS)
-> Show MergeTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MergeTrace -> ShowS
showsPrec :: Int -> MergeTrace -> ShowS
$cshow :: MergeTrace -> String
show :: MergeTrace -> String
$cshowList :: [MergeTrace] -> ShowS
showList :: [MergeTrace] -> ShowS
Show
data TableContent m h = TableContent {
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer :: !WriteBuffer
, forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: !(Ref (WriteBufferBlobs m h))
, forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels :: !(Levels m h)
, forall (m :: * -> *) h. TableContent m h -> LevelsCache m h
tableCache :: !(LevelsCache m h)
, forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel :: !(UnionLevel m h)
}
{-# SPECIALISE duplicateTableContent :: ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-}
duplicateTableContent ::
(PrimMonad m, MonadMask m)
=> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
duplicateTableContent :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m (TableContent m h)
duplicateTableContent ActionRegistry m
reg (TableContent WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbb Levels m h
levels LevelsCache m h
cache UnionLevel m h
ul) = do
Ref (WriteBufferBlobs m h)
wbb' <- ActionRegistry m
-> m (Ref (WriteBufferBlobs m h))
-> (Ref (WriteBufferBlobs m h) -> m ())
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (WriteBufferBlobs m h)
wbb) Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
Levels m h
levels' <- ActionRegistry m -> Levels m h -> m (Levels m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (Levels m h)
duplicateLevels ActionRegistry m
reg Levels m h
levels
LevelsCache m h
cache' <- ActionRegistry m -> LevelsCache m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LevelsCache m h -> m (LevelsCache m h)
duplicateLevelsCache ActionRegistry m
reg LevelsCache m h
cache
UnionLevel m h
ul' <- ActionRegistry m -> UnionLevel m h -> m (UnionLevel m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionLevel m h -> m (UnionLevel m h)
duplicateUnionLevel ActionRegistry m
reg UnionLevel m h
ul
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TableContent m h -> m (TableContent m h))
-> TableContent m h -> m (TableContent m h)
forall a b. (a -> b) -> a -> b
$! WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Levels m h
-> LevelsCache m h
-> UnionLevel m h
-> TableContent m h
forall (m :: * -> *) h.
WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Levels m h
-> LevelsCache m h
-> UnionLevel m h
-> TableContent m h
TableContent WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbb' Levels m h
levels' LevelsCache m h
cache' UnionLevel m h
ul'
{-# SPECIALISE releaseTableContent :: ActionRegistry IO -> TableContent IO h -> IO () #-}
releaseTableContent ::
(PrimMonad m, MonadMask m)
=> ActionRegistry m
-> TableContent m h
-> m ()
releaseTableContent :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m ()
releaseTableContent ActionRegistry m
reg (TableContent WriteBuffer
_wb Ref (WriteBufferBlobs m h)
wbb Levels m h
levels LevelsCache m h
cache UnionLevel m h
ul) = do
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (WriteBufferBlobs m h)
wbb)
ActionRegistry m -> Levels m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m ()
releaseLevels ActionRegistry m
reg Levels m h
levels
ActionRegistry m -> LevelsCache m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LevelsCache m h -> m ()
releaseLevelsCache ActionRegistry m
reg LevelsCache m h
cache
ActionRegistry m -> UnionLevel m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionLevel m h -> m ()
releaseUnionLevel ActionRegistry m
reg UnionLevel m h
ul
data LevelsCache m h = LevelsCache_ {
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns :: !(V.Vector (Ref (Run m h)))
, forall (m :: * -> *) h.
LevelsCache m h -> Vector (Bloom SerialisedKey)
cachedFilters :: !(V.Vector (Bloom SerialisedKey))
, forall (m :: * -> *) h. LevelsCache m h -> Vector Index
cachedIndexes :: !(V.Vector Index)
, forall (m :: * -> *) h. LevelsCache m h -> Vector (Handle h)
cachedKOpsFiles :: !(V.Vector (FS.Handle h))
}
{-# SPECIALISE mkLevelsCache ::
ActionRegistry IO
-> Levels IO h
-> IO (LevelsCache IO h) #-}
mkLevelsCache ::
forall m h. (PrimMonad m, MonadMVar m, MonadMask m)
=> ActionRegistry m
-> Levels m h
-> m (LevelsCache m h)
mkLevelsCache :: forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
mkLevelsCache ActionRegistry m
reg Levels m h
lvls = do
Vector (Ref (Run m h))
rs <- (Ref (Run m h) -> m (Vector (Ref (Run m h))))
-> (Ref (MergingRun LevelMergeType m h)
-> m (Vector (Ref (Run m h))))
-> Levels m h
-> m (Vector (Ref (Run m h)))
forall a.
Monoid a =>
(Ref (Run m h) -> m a)
-> (Ref (MergingRun LevelMergeType m h) -> m a)
-> Levels m h
-> m a
foldRunAndMergeM
((Ref (Run m h) -> Vector (Ref (Run m h)))
-> m (Ref (Run m h)) -> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Ref (Run m h) -> Vector (Ref (Run m h))
forall a. a -> Vector a
V.singleton (m (Ref (Run m h)) -> m (Vector (Ref (Run m h))))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> Ref (Run m h)
-> m (Vector (Ref (Run m h)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m (Ref (Run m h))
dupRun)
(\Ref (MergingRun LevelMergeType m h)
mr -> ActionRegistry m
-> m (Vector (Ref (Run m h)))
-> (Vector (Ref (Run m h)) -> m ())
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingRun LevelMergeType m h) -> m (Vector (Ref (Run m h)))
forall (m :: * -> *) t h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Vector (Ref (Run m h)))
MR.duplicateRuns Ref (MergingRun LevelMergeType m h)
mr) ((Ref (Run m h) -> m ()) -> Vector (Ref (Run m h)) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Vector a -> m ()
V.mapM_ Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef))
Levels m h
lvls
LevelsCache m h -> m (LevelsCache m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LevelsCache m h -> m (LevelsCache m h))
-> LevelsCache m h -> m (LevelsCache m h)
forall a b. (a -> b) -> a -> b
$! LevelsCache_ {
cachedRuns :: Vector (Ref (Run m h))
cachedRuns = Vector (Ref (Run m h))
rs
, cachedFilters :: Vector (Bloom SerialisedKey)
cachedFilters = (Ref (Run m h) -> Bloom SerialisedKey)
-> Vector (Ref (Run m h)) -> Vector (Bloom SerialisedKey)
forall a b. (a -> b) -> Vector a -> Vector b
mapStrict (\(DeRef Run m h
r) -> Run m h -> Bloom SerialisedKey
forall (m :: * -> *) h. Run m h -> Bloom SerialisedKey
Run.runFilter Run m h
r) Vector (Ref (Run m h))
rs
, cachedIndexes :: Vector Index
cachedIndexes = (Ref (Run m h) -> Index) -> Vector (Ref (Run m h)) -> Vector Index
forall a b. (a -> b) -> Vector a -> Vector b
mapStrict (\(DeRef Run m h
r) -> Run m h -> Index
forall (m :: * -> *) h. Run m h -> Index
Run.runIndex Run m h
r) Vector (Ref (Run m h))
rs
, cachedKOpsFiles :: Vector (Handle h)
cachedKOpsFiles = (Ref (Run m h) -> Handle h)
-> Vector (Ref (Run m h)) -> Vector (Handle h)
forall a b. (a -> b) -> Vector a -> Vector b
mapStrict (\(DeRef Run m h
r) -> Run m h -> Handle h
forall (m :: * -> *) h. Run m h -> Handle h
Run.runKOpsFile Run m h
r) Vector (Ref (Run m h))
rs
}
where
dupRun :: Ref (Run m h) -> m (Ref (Run m h))
dupRun Ref (Run m h)
r = ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
foldRunAndMergeM ::
Monoid a
=> (Ref (Run m h) -> m a)
-> (Ref (MergingRun MR.LevelMergeType m h) -> m a)
-> Levels m h
-> m a
foldRunAndMergeM :: forall a.
Monoid a =>
(Ref (Run m h) -> m a)
-> (Ref (MergingRun LevelMergeType m h) -> m a)
-> Levels m h
-> m a
foldRunAndMergeM Ref (Run m h) -> m a
k1 Ref (MergingRun LevelMergeType m h) -> m a
k2 Levels m h
ls =
(Vector a -> a) -> m (Vector a) -> m a
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Vector a -> a
forall m. Monoid m => Vector m -> m
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold (m (Vector a) -> m a) -> m (Vector a) -> m a
forall a b. (a -> b) -> a -> b
$ Levels m h -> (Level m h -> m a) -> m (Vector a)
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
forMStrict Levels m h
ls ((Level m h -> m a) -> m (Vector a))
-> (Level m h -> m a) -> m (Vector a)
forall a b. (a -> b) -> a -> b
$ \(Level IncomingRun m h
ir Vector (Ref (Run m h))
rs) -> do
a
incoming <- case IncomingRun m h
ir of
Single Ref (Run m h)
r -> Ref (Run m h) -> m a
k1 Ref (Run m h)
r
Merging MergePolicyForLevel
_ NominalDebt
_ PrimVar (PrimState m) NominalCredits
_ Ref (MergingRun LevelMergeType m h)
mr -> Ref (MergingRun LevelMergeType m h) -> m a
k2 Ref (MergingRun LevelMergeType m h)
mr
(a
incoming <>) (a -> a) -> (Vector a -> a) -> Vector a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector a -> a
forall m. Monoid m => Vector m -> m
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold (Vector a -> a) -> m (Vector a) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector (Ref (Run m h)) -> (Ref (Run m h) -> m a) -> m (Vector a)
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
V.forM Vector (Ref (Run m h))
rs Ref (Run m h) -> m a
k1
{-# SPECIALISE rebuildCache ::
ActionRegistry IO
-> LevelsCache IO h
-> Levels IO h
-> IO (LevelsCache IO h) #-}
rebuildCache ::
(PrimMonad m, MonadMVar m, MonadMask m)
=> ActionRegistry m
-> LevelsCache m h
-> Levels m h
-> m (LevelsCache m h)
rebuildCache :: forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m
-> LevelsCache m h -> Levels m h -> m (LevelsCache m h)
rebuildCache ActionRegistry m
reg LevelsCache m h
oldCache Levels m h
newLevels = do
ActionRegistry m -> LevelsCache m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LevelsCache m h -> m ()
releaseLevelsCache ActionRegistry m
reg LevelsCache m h
oldCache
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
mkLevelsCache ActionRegistry m
reg Levels m h
newLevels
{-# SPECIALISE duplicateLevelsCache ::
ActionRegistry IO
-> LevelsCache IO h
-> IO (LevelsCache IO h) #-}
duplicateLevelsCache ::
(PrimMonad m, MonadMask m)
=> ActionRegistry m
-> LevelsCache m h
-> m (LevelsCache m h)
duplicateLevelsCache :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LevelsCache m h -> m (LevelsCache m h)
duplicateLevelsCache ActionRegistry m
reg LevelsCache m h
cache = do
Vector (Ref (Run m h))
rs' <- Vector (Ref (Run m h))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
forMStrict (LevelsCache m h -> Vector (Ref (Run m h))
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns LevelsCache m h
cache) ((Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h))))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->
ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
LevelsCache m h -> m (LevelsCache m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return LevelsCache m h
cache { cachedRuns = rs' }
{-# SPECIALISE releaseLevelsCache ::
ActionRegistry IO
-> LevelsCache IO h
-> IO () #-}
releaseLevelsCache ::
(PrimMonad m, MonadMask m)
=> ActionRegistry m
-> LevelsCache m h
-> m ()
releaseLevelsCache :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LevelsCache m h -> m ()
releaseLevelsCache ActionRegistry m
reg LevelsCache m h
cache =
Vector (Ref (Run m h)) -> (Ref (Run m h) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ (LevelsCache m h -> Vector (Ref (Run m h))
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns LevelsCache m h
cache) ((Ref (Run m h) -> m ()) -> m ())
-> (Ref (Run m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r)
type Levels m h = V.Vector (Level m h)
data Level m h = Level {
forall (m :: * -> *) h. Level m h -> IncomingRun m h
incomingRun :: !(IncomingRun m h)
, forall (m :: * -> *) h. Level m h -> Vector (Ref (Run m h))
residentRuns :: !(V.Vector (Ref (Run m h)))
}
{-# SPECIALISE duplicateLevels :: ActionRegistry IO -> Levels IO h -> IO (Levels IO h) #-}
duplicateLevels ::
(PrimMonad m, MonadMask m)
=> ActionRegistry m
-> Levels m h
-> m (Levels m h)
duplicateLevels :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (Levels m h)
duplicateLevels ActionRegistry m
reg Levels m h
levels =
Levels m h -> (Level m h -> m (Level m h)) -> m (Levels m h)
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
forMStrict Levels m h
levels ((Level m h -> m (Level m h)) -> m (Levels m h))
-> (Level m h -> m (Level m h)) -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$ \Level {IncomingRun m h
incomingRun :: forall (m :: * -> *) h. Level m h -> IncomingRun m h
incomingRun :: IncomingRun m h
incomingRun, Vector (Ref (Run m h))
residentRuns :: forall (m :: * -> *) h. Level m h -> Vector (Ref (Run m h))
residentRuns :: Vector (Ref (Run m h))
residentRuns} -> do
IncomingRun m h
incomingRun' <- ActionRegistry m
-> m (IncomingRun m h)
-> (IncomingRun m h -> m ())
-> m (IncomingRun m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (IncomingRun m h -> m (IncomingRun m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
IncomingRun m h -> m (IncomingRun m h)
duplicateIncomingRun IncomingRun m h
incomingRun) IncomingRun m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
IncomingRun m h -> m ()
releaseIncomingRun
Vector (Ref (Run m h))
residentRuns' <- Vector (Ref (Run m h))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
forMStrict Vector (Ref (Run m h))
residentRuns ((Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h))))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->
ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
Level m h -> m (Level m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Level m h -> m (Level m h)) -> Level m h -> m (Level m h)
forall a b. (a -> b) -> a -> b
$! Level {
incomingRun :: IncomingRun m h
incomingRun = IncomingRun m h
incomingRun',
residentRuns :: Vector (Ref (Run m h))
residentRuns = Vector (Ref (Run m h))
residentRuns'
}
{-# SPECIALISE releaseLevels :: ActionRegistry IO -> Levels IO h -> IO () #-}
releaseLevels ::
(PrimMonad m, MonadMask m)
=> ActionRegistry m
-> Levels m h
-> m ()
releaseLevels :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m ()
releaseLevels ActionRegistry m
reg Levels m h
levels =
Levels m h -> (Level m h -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Levels m h
levels ((Level m h -> m ()) -> m ()) -> (Level m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Level {IncomingRun m h
incomingRun :: forall (m :: * -> *) h. Level m h -> IncomingRun m h
incomingRun :: IncomingRun m h
incomingRun, Vector (Ref (Run m h))
residentRuns :: forall (m :: * -> *) h. Level m h -> Vector (Ref (Run m h))
residentRuns :: Vector (Ref (Run m h))
residentRuns} -> do
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (IncomingRun m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
IncomingRun m h -> m ()
releaseIncomingRun IncomingRun m h
incomingRun)
(Ref (Run m h) -> m ()) -> Vector (Ref (Run m h)) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Vector a -> m ()
V.mapM_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) Vector (Ref (Run m h))
residentRuns
{-# SPECIALISE iforLevelM_ :: Levels IO h -> (LevelNo -> Level IO h -> IO ()) -> IO () #-}
iforLevelM_ :: Monad m => Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
iforLevelM_ :: forall (m :: * -> *) h.
Monad m =>
Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
iforLevelM_ Levels m h
lvls LevelNo -> Level m h -> m ()
k = Levels m h -> (Int -> Level m h -> m ()) -> m ()
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (Int -> a -> m b) -> m ()
V.iforM_ Levels m h
lvls ((Int -> Level m h -> m ()) -> m ())
-> (Int -> Level m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Int
i Level m h
lvl -> LevelNo -> Level m h -> m ()
k (Int -> LevelNo
LevelNo (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)) Level m h
lvl
data UnionLevel m h =
NoUnion
| Union !(Ref (MergingTree m h))
{-# SPECIALISE duplicateUnionLevel ::
ActionRegistry IO
-> UnionLevel IO h
-> IO (UnionLevel IO h) #-}
duplicateUnionLevel ::
(PrimMonad m, MonadMask m)
=> ActionRegistry m
-> UnionLevel m h
-> m (UnionLevel m h)
duplicateUnionLevel :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionLevel m h -> m (UnionLevel m h)
duplicateUnionLevel ActionRegistry m
reg UnionLevel m h
ul =
case UnionLevel m h
ul of
UnionLevel m h
NoUnion -> UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return UnionLevel m h
ul
Union Ref (MergingTree m h)
tree -> Ref (MergingTree m h) -> UnionLevel m h
forall (m :: * -> *) h. Ref (MergingTree m h) -> UnionLevel m h
Union (Ref (MergingTree m h) -> UnionLevel m h)
-> m (Ref (MergingTree m h)) -> m (UnionLevel m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ActionRegistry m
-> m (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> m ())
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingTree m h) -> m (Ref (MergingTree m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingTree m h)
tree) Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
{-# SPECIALISE releaseUnionLevel ::
ActionRegistry IO
-> UnionLevel IO h
-> IO () #-}
releaseUnionLevel ::
(PrimMonad m, MonadMask m)
=> ActionRegistry m
-> UnionLevel m h
-> m ()
releaseUnionLevel :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionLevel m h -> m ()
releaseUnionLevel ActionRegistry m
_ UnionLevel m h
NoUnion = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
releaseUnionLevel ActionRegistry m
reg (Union Ref (MergingTree m h)
tree) = ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingTree m h)
tree)
{-# SPECIALISE updatesWithInterleavedFlushes ::
Tracer IO (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS IO h
-> HasBlockIO IO h
-> SessionRoot
-> UniqCounter IO
-> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry IO
-> TableContent IO h
-> IO (TableContent IO h) #-}
updatesWithInterleavedFlushes ::
forall m h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m)
=> Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
updatesWithInterleavedFlushes :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
updatesWithInterleavedFlushes Tracer m (AtLevel MergeTrace)
tr TableConfig
conf ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es ActionRegistry m
reg TableContent m h
tc = do
let wb :: WriteBuffer
wb = TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tc
wbblobs :: Ref (WriteBufferBlobs m h)
wbblobs = TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
tc
(WriteBuffer
wb', Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es') <- HasFS m h
-> ResolveSerialisedValue
-> Ref (WriteBufferBlobs m h)
-> NumEntries
-> WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m, PrimMonad m) =>
HasFS m h
-> ResolveSerialisedValue
-> Ref (WriteBufferBlobs m h)
-> NumEntries
-> WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
addWriteBufferEntries HasFS m h
hfs ResolveSerialisedValue
resolve Ref (WriteBufferBlobs m h)
wbblobs NumEntries
maxn WriteBuffer
wb Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es
let numAdded :: Int
numAdded = NumEntries -> Int
unNumEntries (WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb') Int -> Int -> Int
forall a. Num a => a -> a -> a
- NumEntries -> Int
unNumEntries (WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb)
TableConfig -> NominalCredits -> Levels m h -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> NominalCredits -> Levels m h -> m ()
supplyCredits TableConfig
conf (Int -> NominalCredits
NominalCredits Int
numAdded) (TableContent m h -> Levels m h
forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels TableContent m h
tc)
let tc' :: TableContent m h
tc' = TableContent m h
tc { tableWriteBuffer = wb' }
if WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb' NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
< NumEntries
maxn then do
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TableContent m h -> m (TableContent m h))
-> TableContent m h -> m (TableContent m h)
forall a b. (a -> b) -> a -> b
$! TableContent m h
tc'
else do
TableContent m h
tc'' <- Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
flushWriteBuffer Tracer m (AtLevel MergeTrace)
tr TableConfig
conf ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc ActionRegistry m
reg TableContent m h
tc'
if Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Bool
forall a. Vector a -> Bool
V.null Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es' then
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TableContent m h -> m (TableContent m h))
-> TableContent m h -> m (TableContent m h)
forall a b. (a -> b) -> a -> b
$! TableContent m h
tc''
else
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
updatesWithInterleavedFlushes Tracer m (AtLevel MergeTrace)
tr TableConfig
conf ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es' ActionRegistry m
reg TableContent m h
tc''
where
AllocNumEntries NumEntries
maxn = TableConfig -> WriteBufferAlloc
confWriteBufferAlloc TableConfig
conf
{-# SPECIALISE addWriteBufferEntries ::
HasFS IO h
-> ResolveSerialisedValue
-> Ref (WriteBufferBlobs IO h)
-> NumEntries
-> WriteBuffer
-> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> IO (WriteBuffer, V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)) #-}
addWriteBufferEntries ::
(MonadSTM m, MonadThrow m, PrimMonad m)
=> HasFS m h
-> ResolveSerialisedValue
-> Ref (WriteBufferBlobs m h)
-> NumEntries
-> WriteBuffer
-> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer, V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
addWriteBufferEntries :: forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m, PrimMonad m) =>
HasFS m h
-> ResolveSerialisedValue
-> Ref (WriteBufferBlobs m h)
-> NumEntries
-> WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
addWriteBufferEntries HasFS m h
hfs ResolveSerialisedValue
f Ref (WriteBufferBlobs m h)
wbblobs NumEntries
maxn =
\WriteBuffer
wb Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es ->
(\ r :: (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
r@(WriteBuffer
wb', Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es') ->
Bool
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a. HasCallStack => Bool -> a -> a
assert (WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb' NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
<= NumEntries
maxn) ((WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)))
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a b. (a -> b) -> a -> b
$
Bool
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a. HasCallStack => Bool -> a -> a
assert ((WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb' NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
< NumEntries
maxn Bool -> Bool -> Bool
&& Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Bool
forall a. Vector a -> Bool
V.null Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es')
Bool -> Bool -> Bool
|| (WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb' NumEntries -> NumEntries -> Bool
forall a. Eq a => a -> a -> Bool
== NumEntries
maxn)) ((WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)))
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a b. (a -> b) -> a -> b
$
(WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
r)
((WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)))
-> m (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> m (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
go WriteBuffer
wb Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es
where
go :: WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
go !WriteBuffer
wb !Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es
| WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
wb NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
>= NumEntries
maxn = (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> m (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WriteBuffer
wb, Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es)
| Just ((SerialisedKey
k, Entry SerialisedValue SerialisedBlob
e), Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es') <- Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Maybe
((SerialisedKey, Entry SerialisedValue SerialisedBlob),
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a. Vector a -> Maybe (a, Vector a)
V.uncons Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es = do
Entry SerialisedValue BlobSpan
e' <- (SerialisedBlob -> m BlobSpan)
-> Entry SerialisedValue SerialisedBlob
-> m (Entry SerialisedValue BlobSpan)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b)
-> Entry SerialisedValue a -> f (Entry SerialisedValue b)
traverse (HasFS m h
-> Ref (WriteBufferBlobs m h) -> SerialisedBlob -> m BlobSpan
forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
HasFS m h
-> Ref (WriteBufferBlobs m h) -> SerialisedBlob -> m BlobSpan
WBB.addBlob HasFS m h
hfs Ref (WriteBufferBlobs m h)
wbblobs) Entry SerialisedValue SerialisedBlob
e
WriteBuffer
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> m (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
go (ResolveSerialisedValue
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> WriteBuffer
-> WriteBuffer
WB.addEntry ResolveSerialisedValue
f SerialisedKey
k Entry SerialisedValue BlobSpan
e' WriteBuffer
wb) Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es'
| Bool
otherwise = (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
-> m (WriteBuffer,
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WriteBuffer
wb, Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es)
{-# SPECIALISE flushWriteBuffer ::
Tracer IO (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS IO h
-> HasBlockIO IO h
-> SessionRoot
-> UniqCounter IO
-> ActionRegistry IO
-> TableContent IO h
-> IO (TableContent IO h) #-}
flushWriteBuffer ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
flushWriteBuffer :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
flushWriteBuffer Tracer m (AtLevel MergeTrace)
tr TableConfig
conf ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc ActionRegistry m
reg TableContent m h
tc
| WriteBuffer -> Bool
WB.null (TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tc) = TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent m h
tc
| Bool
otherwise = do
!Unique
uniq <- UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter UniqCounter m
uc
let !size :: NumEntries
size = WriteBuffer -> NumEntries
WB.numEntries (TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tc)
!ln :: LevelNo
ln = Int -> LevelNo
LevelNo Int
1
(!RunParams
runParams,
RunFsPaths
runPaths) = ActiveDir
-> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
mergingRunParamsForLevel
(SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root) TableConfig
conf Unique
uniq LevelNo
ln
Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$
NumEntries -> RunNumber -> RunParams -> MergeTrace
TraceFlushWriteBuffer NumEntries
size (RunFsPaths -> RunNumber
runNumber RunFsPaths
runPaths) RunParams
runParams
Ref (Run m h)
r <- ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
Run.fromWriteBuffer
HasFS m h
hfs HasBlockIO m h
hbio
RunParams
runParams RunFsPaths
runPaths
(TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tc)
(TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
tc))
Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef (TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
tc))
Ref (WriteBufferBlobs m h)
wbblobs' <- ActionRegistry m
-> m (Ref (WriteBufferBlobs m h))
-> (Ref (WriteBufferBlobs m h) -> m ())
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
WBB.new HasFS m h
hfs (SessionRoot -> Unique -> FsPath
Paths.tableBlobPath SessionRoot
root Unique
uniq))
Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
Levels m h
levels' <- Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Ref (Run m h)
-> ActionRegistry m
-> Levels m h
-> UnionLevel m h
-> m (Levels m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Ref (Run m h)
-> ActionRegistry m
-> Levels m h
-> UnionLevel m h
-> m (Levels m h)
addRunToLevels Tracer m (AtLevel MergeTrace)
tr TableConfig
conf ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc Ref (Run m h)
r ActionRegistry m
reg
(TableContent m h -> Levels m h
forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels TableContent m h
tc)
(TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tc)
LevelsCache m h
tableCache' <- ActionRegistry m
-> LevelsCache m h -> Levels m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m
-> LevelsCache m h -> Levels m h -> m (LevelsCache m h)
rebuildCache ActionRegistry m
reg (TableContent m h -> LevelsCache m h
forall (m :: * -> *) h. TableContent m h -> LevelsCache m h
tableCache TableContent m h
tc) Levels m h
levels'
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TableContent m h -> m (TableContent m h))
-> TableContent m h -> m (TableContent m h)
forall a b. (a -> b) -> a -> b
$! TableContent {
tableWriteBuffer :: WriteBuffer
tableWriteBuffer = WriteBuffer
WB.empty
, tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs = Ref (WriteBufferBlobs m h)
wbblobs'
, tableLevels :: Levels m h
tableLevels = Levels m h
levels'
, tableCache :: LevelsCache m h
tableCache = LevelsCache m h
tableCache'
, tableUnionLevel :: UnionLevel m h
tableUnionLevel = TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tc
}
{-# SPECIALISE addRunToLevels ::
Tracer IO (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS IO h
-> HasBlockIO IO h
-> SessionRoot
-> UniqCounter IO
-> Ref (Run IO h)
-> ActionRegistry IO
-> Levels IO h
-> UnionLevel IO h
-> IO (Levels IO h) #-}
addRunToLevels ::
forall m h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Ref (Run m h)
-> ActionRegistry m
-> Levels m h
-> UnionLevel m h
-> m (Levels m h)
addRunToLevels :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Ref (Run m h)
-> ActionRegistry m
-> Levels m h
-> UnionLevel m h
-> m (Levels m h)
addRunToLevels Tracer m (AtLevel MergeTrace)
tr conf :: TableConfig
conf@TableConfig{MergeSchedule
DiskCachePolicy
FencePointerIndexType
BloomFilterAlloc
WriteBufferAlloc
SizeRatio
MergePolicy
confWriteBufferAlloc :: TableConfig -> WriteBufferAlloc
confMergePolicy :: MergePolicy
confSizeRatio :: SizeRatio
confWriteBufferAlloc :: WriteBufferAlloc
confBloomFilterAlloc :: BloomFilterAlloc
confFencePointerIndex :: FencePointerIndexType
confDiskCachePolicy :: DiskCachePolicy
confMergeSchedule :: MergeSchedule
confMergePolicy :: TableConfig -> MergePolicy
confSizeRatio :: TableConfig -> SizeRatio
confBloomFilterAlloc :: TableConfig -> BloomFilterAlloc
confFencePointerIndex :: TableConfig -> FencePointerIndexType
confDiskCachePolicy :: TableConfig -> DiskCachePolicy
confMergeSchedule :: TableConfig -> MergeSchedule
..} ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio SessionRoot
root UniqCounter m
uc Ref (Run m h)
r0 ActionRegistry m
reg Levels m h
levels UnionLevel m h
ul = do
LevelNo -> Vector (Ref (Run m h)) -> Levels m h -> m (Levels m h)
go (Int -> LevelNo
LevelNo Int
1) (Ref (Run m h) -> Vector (Ref (Run m h))
forall a. a -> Vector a
V.singleton Ref (Run m h)
r0) Levels m h
levels
where
go ::
LevelNo
-> V.Vector (Ref (Run m h))
-> V.Vector (Level m h )
-> m (V.Vector (Level m h))
go :: LevelNo -> Vector (Ref (Run m h)) -> Levels m h -> m (Levels m h)
go !LevelNo
ln Vector (Ref (Run m h))
rs (Levels m h -> Maybe (Level m h, Levels m h)
forall a. Vector a -> Maybe (a, Vector a)
V.uncons -> Maybe (Level m h, Levels m h)
Nothing) = do
Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln MergeTrace
TraceAddLevel
let policyForLevel :: MergePolicyForLevel
policyForLevel = MergePolicy
-> LevelNo -> Levels m h -> UnionLevel m h -> MergePolicyForLevel
forall (m :: * -> *) h.
MergePolicy
-> LevelNo -> Levels m h -> UnionLevel m h -> MergePolicyForLevel
mergePolicyForLevel MergePolicy
confMergePolicy LevelNo
ln Levels m h
forall a. Vector a
V.empty UnionLevel m h
ul
IncomingRun m h
ir <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
policyForLevel LevelMergeType
MR.MergeLastLevel LevelNo
ln Vector (Ref (Run m h))
rs
Levels m h -> m (Levels m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! Level m h -> Levels m h
forall a. a -> Vector a
V.singleton (Level m h -> Levels m h) -> Level m h -> Levels m h
forall a b. (a -> b) -> a -> b
$ IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir Vector (Ref (Run m h))
forall a. Vector a
V.empty
go !LevelNo
ln Vector (Ref (Run m h))
rs' (Levels m h -> Maybe (Level m h, Levels m h)
forall a. Vector a -> Maybe (a, Vector a)
V.uncons -> Just (Level IncomingRun m h
ir Vector (Ref (Run m h))
rs, Levels m h
ls)) = do
Ref (Run m h)
r <- LevelNo -> IncomingRun m h -> m (Ref (Run m h))
expectCompletedMerge LevelNo
ln IncomingRun m h
ir
case MergePolicy
-> LevelNo -> Levels m h -> UnionLevel m h -> MergePolicyForLevel
forall (m :: * -> *) h.
MergePolicy
-> LevelNo -> Levels m h -> UnionLevel m h -> MergePolicyForLevel
mergePolicyForLevel MergePolicy
confMergePolicy LevelNo
ln Levels m h
ls UnionLevel m h
ul of
MergePolicyForLevel
LevelTiering | Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
<= TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize' TableConfig
conf MergePolicyForLevel
LevelTiering (LevelNo -> LevelNo
forall a. Enum a => a -> a
pred LevelNo
ln) -> do
let mergeType :: LevelMergeType
mergeType = Levels m h -> UnionLevel m h -> LevelMergeType
forall (m :: * -> *) h.
Levels m h -> UnionLevel m h -> LevelMergeType
mergeTypeForLevel Levels m h
ls UnionLevel m h
ul
IncomingRun m h
ir' <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
LevelTiering LevelMergeType
mergeType LevelNo
ln (Vector (Ref (Run m h))
rs' Vector (Ref (Run m h)) -> Ref (Run m h) -> Vector (Ref (Run m h))
forall a. Vector a -> a -> Vector a
`V.snoc` Ref (Run m h)
r)
Levels m h -> m (Levels m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir' Vector (Ref (Run m h))
rs Level m h -> Levels m h -> Levels m h
forall a. a -> Vector a -> Vector a
`V.cons` Levels m h
ls
MergePolicyForLevel
LevelTiering | SizeRatio -> Vector (Ref (Run m h)) -> Bool
forall run. SizeRatio -> Vector run -> Bool
levelIsFull SizeRatio
confSizeRatio Vector (Ref (Run m h))
rs -> do
IncomingRun m h
ir' <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
LevelTiering LevelMergeType
MR.MergeMidLevel LevelNo
ln Vector (Ref (Run m h))
rs'
Levels m h
ls' <- LevelNo -> Vector (Ref (Run m h)) -> Levels m h -> m (Levels m h)
go (LevelNo -> LevelNo
forall a. Enum a => a -> a
succ LevelNo
ln) (Ref (Run m h)
r Ref (Run m h) -> Vector (Ref (Run m h)) -> Vector (Ref (Run m h))
forall a. a -> Vector a -> Vector a
`V.cons` Vector (Ref (Run m h))
rs) Levels m h
ls
Levels m h -> m (Levels m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir' Vector (Ref (Run m h))
forall a. Vector a
V.empty Level m h -> Levels m h -> Levels m h
forall a. a -> Vector a -> Vector a
`V.cons` Levels m h
ls'
MergePolicyForLevel
LevelTiering -> do
let mergeType :: LevelMergeType
mergeType = Levels m h -> UnionLevel m h -> LevelMergeType
forall (m :: * -> *) h.
Levels m h -> UnionLevel m h -> LevelMergeType
mergeTypeForLevel Levels m h
ls UnionLevel m h
ul
IncomingRun m h
ir' <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
LevelTiering LevelMergeType
mergeType LevelNo
ln Vector (Ref (Run m h))
rs'
Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln
(MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$ RunNumber -> Vector RunNumber -> MergeTrace
TraceAddRun
(Ref (Run m h) -> RunNumber
forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
Run.runFsPathsNumber Ref (Run m h)
r)
((Ref (Run m h) -> RunNumber)
-> Vector (Ref (Run m h)) -> Vector RunNumber
forall a b. (a -> b) -> Vector a -> Vector b
V.map Ref (Run m h) -> RunNumber
forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
Run.runFsPathsNumber Vector (Ref (Run m h))
rs)
Levels m h -> m (Levels m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir' (Ref (Run m h)
r Ref (Run m h) -> Vector (Ref (Run m h)) -> Vector (Ref (Run m h))
forall a. a -> Vector a -> Vector a
`V.cons` Vector (Ref (Run m h))
rs) Level m h -> Levels m h -> Levels m h
forall a. a -> Vector a -> Vector a
`V.cons` Levels m h
ls
MergePolicyForLevel
LevelLevelling | Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
> TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize' TableConfig
conf MergePolicyForLevel
LevelLevelling LevelNo
ln -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Bool
forall a. Vector a -> Bool
V.null Vector (Ref (Run m h))
rs Bool -> Bool -> Bool
&& Levels m h -> Bool
forall a. Vector a -> Bool
V.null Levels m h
ls) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
IncomingRun m h
ir' <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
LevelTiering LevelMergeType
MR.MergeMidLevel LevelNo
ln Vector (Ref (Run m h))
rs'
Levels m h
ls' <- LevelNo -> Vector (Ref (Run m h)) -> Levels m h -> m (Levels m h)
go (LevelNo -> LevelNo
forall a. Enum a => a -> a
succ LevelNo
ln) (Ref (Run m h) -> Vector (Ref (Run m h))
forall a. a -> Vector a
V.singleton Ref (Run m h)
r) Levels m h
forall a. Vector a
V.empty
Levels m h -> m (Levels m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir' Vector (Ref (Run m h))
forall a. Vector a
V.empty Level m h -> Levels m h -> Levels m h
forall a. a -> Vector a -> Vector a
`V.cons` Levels m h
ls'
MergePolicyForLevel
LevelLevelling -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Bool
forall a. Vector a -> Bool
V.null Vector (Ref (Run m h))
rs Bool -> Bool -> Bool
&& Levels m h -> Bool
forall a. Vector a -> Bool
V.null Levels m h
ls) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
IncomingRun m h
ir' <- MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
LevelLevelling LevelMergeType
MR.MergeLastLevel LevelNo
ln (Vector (Ref (Run m h))
rs' Vector (Ref (Run m h)) -> Ref (Run m h) -> Vector (Ref (Run m h))
forall a. Vector a -> a -> Vector a
`V.snoc` Ref (Run m h)
r)
Levels m h -> m (Levels m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Levels m h -> m (Levels m h)) -> Levels m h -> m (Levels m h)
forall a b. (a -> b) -> a -> b
$! IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
forall (m :: * -> *) h.
IncomingRun m h -> Vector (Ref (Run m h)) -> Level m h
Level IncomingRun m h
ir' Vector (Ref (Run m h))
forall a. Vector a
V.empty Level m h -> Levels m h -> Levels m h
forall a. a -> Vector a -> Vector a
`V.cons` Levels m h
forall a. Vector a
V.empty
expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h))
expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h))
expectCompletedMerge LevelNo
ln IncomingRun m h
ir = do
Ref (Run m h)
r <- case IncomingRun m h
ir of
Single Ref (Run m h)
r -> Ref (Run m h) -> m (Ref (Run m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ref (Run m h)
r
Merging MergePolicyForLevel
_ NominalDebt
_ PrimVar (PrimState m) NominalCredits
_ Ref (MergingRun LevelMergeType m h)
mr -> do
Ref (Run m h)
r <- ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingRun LevelMergeType m h) -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Ref (Run m h))
MR.expectCompleted Ref (MergingRun LevelMergeType m h)
mr) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingRun LevelMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun LevelMergeType m h)
mr)
Ref (Run m h) -> m (Ref (Run m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ref (Run m h)
r
Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$
RunNumber -> MergeTrace
TraceExpectCompletedMerge (Ref (Run m h) -> RunNumber
forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
Run.runFsPathsNumber Ref (Run m h)
r)
Ref (Run m h) -> m (Ref (Run m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ref (Run m h)
r
newMerge :: MergePolicyForLevel
-> MR.LevelMergeType
-> LevelNo
-> V.Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge :: MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newMerge MergePolicyForLevel
mergePolicy LevelMergeType
mergeType LevelNo
ln Vector (Ref (Run m h))
rs = do
IncomingRun m h
ir <- ActionRegistry m
-> m (IncomingRun m h)
-> (IncomingRun m h -> m ())
-> m (IncomingRun m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(Tracer m (AtLevel MergeTrace)
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newIncomingRunAtLevel Tracer m (AtLevel MergeTrace)
tr HasFS m h
hfs HasBlockIO m h
hbio
SessionRoot
root UniqCounter m
uc TableConfig
conf ResolveSerialisedValue
resolve
MergePolicyForLevel
mergePolicy LevelMergeType
mergeType LevelNo
ln Vector (Ref (Run m h))
rs)
IncomingRun m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
IncomingRun m h -> m ()
releaseIncomingRun
Vector (Ref (Run m h)) -> (Ref (Run m h) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (Ref (Run m h))
rs ((Ref (Run m h) -> m ()) -> m ())
-> (Ref (Run m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r -> ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r)
case MergeSchedule
confMergeSchedule of
MergeSchedule
Incremental -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
MergeSchedule
OneShot ->
m (Ref (Run m h))
-> (Ref (Run m h) -> m ()) -> (Ref (Run m h) -> m ()) -> m ()
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(TableConfig -> LevelNo -> IncomingRun m h -> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> LevelNo -> IncomingRun m h -> m (Ref (Run m h))
immediatelyCompleteIncomingRun TableConfig
conf LevelNo
ln IncomingRun m h
ir)
Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef ((Ref (Run m h) -> m ()) -> m ())
-> (Ref (Run m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->
Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$
NumEntries -> RunNumber -> MergeTrace
TraceCompletedMerge (Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r) (Ref (Run m h) -> RunNumber
forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
Run.runFsPathsNumber Ref (Run m h)
r)
IncomingRun m h -> m (IncomingRun m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return IncomingRun m h
ir
{-# SPECIALISE newIncomingRunAtLevel ::
Tracer IO (AtLevel MergeTrace)
-> HasFS IO h
-> HasBlockIO IO h
-> SessionRoot
-> UniqCounter IO
-> TableConfig
-> ResolveSerialisedValue
-> MergePolicyForLevel
-> MR.LevelMergeType
-> LevelNo
-> V.Vector (Ref (Run IO h))
-> IO (IncomingRun IO h) #-}
newIncomingRunAtLevel ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> Tracer m (AtLevel MergeTrace)
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> MergePolicyForLevel
-> MR.LevelMergeType
-> LevelNo
-> V.Vector (Ref (Run m h))
-> m (IncomingRun m h)
newIncomingRunAtLevel :: forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> MergePolicyForLevel
-> LevelMergeType
-> LevelNo
-> Vector (Ref (Run m h))
-> m (IncomingRun m h)
newIncomingRunAtLevel Tracer m (AtLevel MergeTrace)
tr HasFS m h
hfs HasBlockIO m h
hbio
SessionRoot
root UniqCounter m
uc TableConfig
conf ResolveSerialisedValue
resolve
MergePolicyForLevel
mergePolicy LevelMergeType
mergeType LevelNo
ln Vector (Ref (Run m h))
rs
| Just (Ref (Run m h)
r, Vector (Ref (Run m h))
rest) <- Vector (Ref (Run m h))
-> Maybe (Ref (Run m h), Vector (Ref (Run m h)))
forall a. Vector a -> Maybe (a, Vector a)
V.uncons Vector (Ref (Run m h))
rs, Vector (Ref (Run m h)) -> Bool
forall a. Vector a -> Bool
V.null Vector (Ref (Run m h))
rest = do
Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$
NumEntries -> RunNumber -> MergeTrace
TraceNewMergeSingleRun (Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r) (Ref (Run m h) -> RunNumber
forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
Run.runFsPathsNumber Ref (Run m h)
r)
Ref (Run m h) -> m (IncomingRun m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
Ref (Run m h) -> m (IncomingRun m h)
newIncomingSingleRun Ref (Run m h)
r
| Bool
otherwise = do
Unique
uniq <- UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter UniqCounter m
uc
let (!RunParams
runParams, !RunFsPaths
runPaths) =
ActiveDir
-> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
mergingRunParamsForLevel (SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root) TableConfig
conf Unique
uniq LevelNo
ln
Tracer m (AtLevel MergeTrace) -> AtLevel MergeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m (AtLevel MergeTrace)
tr (AtLevel MergeTrace -> m ()) -> AtLevel MergeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ LevelNo -> MergeTrace -> AtLevel MergeTrace
forall a. LevelNo -> a -> AtLevel a
AtLevel LevelNo
ln (MergeTrace -> AtLevel MergeTrace)
-> MergeTrace -> AtLevel MergeTrace
forall a b. (a -> b) -> a -> b
$
Vector NumEntries
-> RunNumber
-> RunParams
-> MergePolicyForLevel
-> LevelMergeType
-> MergeTrace
TraceNewMerge ((Ref (Run m h) -> NumEntries)
-> Vector (Ref (Run m h)) -> Vector NumEntries
forall a b. (a -> b) -> Vector a -> Vector b
V.map Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Vector (Ref (Run m h))
rs) (RunFsPaths -> RunNumber
runNumber RunFsPaths
runPaths)
RunParams
runParams MergePolicyForLevel
mergePolicy LevelMergeType
mergeType
m (Ref (MergingRun LevelMergeType m h))
-> (Ref (MergingRun LevelMergeType m h) -> m ())
-> (Ref (MergingRun LevelMergeType m h) -> m (IncomingRun m h))
-> m (IncomingRun m h)
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> LevelMergeType
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Ref (MergingRun LevelMergeType m h))
forall t (m :: * -> *) h.
(IsMergeType t, MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> t
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Ref (MergingRun t m h))
MR.new HasFS m h
hfs HasBlockIO m h
hbio ResolveSerialisedValue
resolve RunParams
runParams LevelMergeType
mergeType RunFsPaths
runPaths Vector (Ref (Run m h))
rs)
Ref (MergingRun LevelMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef ((Ref (MergingRun LevelMergeType m h) -> m (IncomingRun m h))
-> m (IncomingRun m h))
-> (Ref (MergingRun LevelMergeType m h) -> m (IncomingRun m h))
-> m (IncomingRun m h)
forall a b. (a -> b) -> a -> b
$ \Ref (MergingRun LevelMergeType m h)
mr ->
Bool -> m (IncomingRun m h) -> m (IncomingRun m h)
forall a. HasCallStack => Bool -> a -> a
assert (Ref (MergingRun LevelMergeType m h) -> MergeDebt
forall t (m :: * -> *) h. Ref (MergingRun t m h) -> MergeDebt
MR.totalMergeDebt Ref (MergingRun LevelMergeType m h)
mr MergeDebt -> MergeDebt -> Bool
forall a. Ord a => a -> a -> Bool
<= TableConfig -> MergePolicyForLevel -> LevelNo -> MergeDebt
maxMergeDebt TableConfig
conf MergePolicyForLevel
mergePolicy LevelNo
ln) (m (IncomingRun m h) -> m (IncomingRun m h))
-> m (IncomingRun m h) -> m (IncomingRun m h)
forall a b. (a -> b) -> a -> b
$
let nominalDebt :: NominalDebt
nominalDebt = TableConfig -> LevelNo -> NominalDebt
nominalDebtForLevel TableConfig
conf LevelNo
ln in
MergePolicyForLevel
-> NominalDebt
-> Ref (MergingRun LevelMergeType m h)
-> m (IncomingRun m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
MergePolicyForLevel
-> NominalDebt
-> Ref (MergingRun LevelMergeType m h)
-> m (IncomingRun m h)
newIncomingMergingRun MergePolicyForLevel
mergePolicy NominalDebt
nominalDebt Ref (MergingRun LevelMergeType m h)
mr
mergingRunParamsForLevel ::
ActiveDir
-> TableConfig
-> Unique
-> LevelNo
-> (RunParams, RunFsPaths)
mergingRunParamsForLevel :: ActiveDir
-> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
mergingRunParamsForLevel ActiveDir
dir TableConfig
conf Unique
unique LevelNo
ln =
(TableConfig -> RunLevelNo -> RunParams
runParamsForLevel TableConfig
conf (LevelNo -> RunLevelNo
RegularLevel LevelNo
ln), RunFsPaths
runPaths)
where
!runPaths :: RunFsPaths
runPaths = ActiveDir -> RunNumber -> RunFsPaths
Paths.runPath ActiveDir
dir (Unique -> RunNumber
uniqueToRunNumber Unique
unique)
mergePolicyForLevel ::
MergePolicy
-> LevelNo
-> Levels m h
-> UnionLevel m h
-> MergePolicyForLevel
mergePolicyForLevel :: forall (m :: * -> *) h.
MergePolicy
-> LevelNo -> Levels m h -> UnionLevel m h -> MergePolicyForLevel
mergePolicyForLevel MergePolicy
MergePolicyLazyLevelling (LevelNo Int
n) Levels m h
nextLevels UnionLevel m h
unionLevel
| Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
= MergePolicyForLevel
LevelTiering
| Levels m h -> Bool
forall a. Vector a -> Bool
V.null Levels m h
nextLevels
, UnionLevel m h
NoUnion <- UnionLevel m h
unionLevel
= MergePolicyForLevel
LevelLevelling
| Bool
otherwise
= MergePolicyForLevel
LevelTiering
maxRunSize ::
SizeRatio
-> WriteBufferAlloc
-> MergePolicyForLevel
-> LevelNo
-> NumEntries
maxRunSize :: SizeRatio
-> WriteBufferAlloc -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize SizeRatio
_ WriteBufferAlloc
_ MergePolicyForLevel
_ (LevelNo Int
ln)
| Int
ln Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 = String -> NumEntries
forall a. HasCallStack => String -> a
error String
"maxRunSize: non-positive level number"
| Int
ln Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = Int -> NumEntries
NumEntries Int
0
maxRunSize SizeRatio
sizeRatio (AllocNumEntries NumEntries
bufferSize) MergePolicyForLevel
LevelTiering LevelNo
ln =
Int -> NumEntries
NumEntries (Int -> NumEntries) -> Int -> NumEntries
forall a b. (a -> b) -> a -> b
$ Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering (SizeRatio -> Int
sizeRatioInt SizeRatio
sizeRatio) NumEntries
bufferSize LevelNo
ln
maxRunSize SizeRatio
sizeRatio (AllocNumEntries NumEntries
bufferSize) MergePolicyForLevel
LevelLevelling LevelNo
ln =
Int -> NumEntries
NumEntries (Int -> NumEntries) -> Int -> NumEntries
forall a b. (a -> b) -> a -> b
$ Int -> NumEntries -> LevelNo -> Int
maxRunSizeLevelling (SizeRatio -> Int
sizeRatioInt SizeRatio
sizeRatio) NumEntries
bufferSize LevelNo
ln
maxRunSizeTiering, maxRunSizeLevelling :: Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering :: Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering Int
sizeRatio (NumEntries Int
bufferSize) (LevelNo Int
ln) =
Int
bufferSize Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
sizeRatio Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ Int -> Int
forall a. Enum a => a -> a
pred Int
ln
maxRunSizeLevelling :: Int -> NumEntries -> LevelNo -> Int
maxRunSizeLevelling Int
sizeRatio NumEntries
bufferSize LevelNo
ln =
Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering Int
sizeRatio NumEntries
bufferSize (LevelNo -> LevelNo
forall a. Enum a => a -> a
succ LevelNo
ln)
maxRunSize' :: TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize' :: TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize' TableConfig
config MergePolicyForLevel
policy LevelNo
ln =
SizeRatio
-> WriteBufferAlloc -> MergePolicyForLevel -> LevelNo -> NumEntries
maxRunSize (TableConfig -> SizeRatio
confSizeRatio TableConfig
config) (TableConfig -> WriteBufferAlloc
confWriteBufferAlloc TableConfig
config) MergePolicyForLevel
policy LevelNo
ln
mergeTypeForLevel :: Levels m h -> UnionLevel m h -> MR.LevelMergeType
mergeTypeForLevel :: forall (m :: * -> *) h.
Levels m h -> UnionLevel m h -> LevelMergeType
mergeTypeForLevel Levels m h
levels UnionLevel m h
unionLevel
| Levels m h -> Bool
forall a. Vector a -> Bool
V.null Levels m h
levels, UnionLevel m h
NoUnion <- UnionLevel m h
unionLevel = LevelMergeType
MR.MergeLastLevel
| Bool
otherwise = LevelMergeType
MR.MergeMidLevel
levelIsFull :: SizeRatio -> V.Vector run -> Bool
levelIsFull :: forall run. SizeRatio -> Vector run -> Bool
levelIsFull SizeRatio
sr Vector run
rs = Vector run -> Int
forall a. Vector a -> Int
V.length Vector run
rs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= (SizeRatio -> Int
sizeRatioInt SizeRatio
sr)
{-# SPECIALISE supplyCredits ::
TableConfig
-> NominalCredits
-> Levels IO h
-> IO ()
#-}
supplyCredits ::
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m)
=> TableConfig
-> NominalCredits
-> Levels m h
-> m ()
supplyCredits :: forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> NominalCredits -> Levels m h -> m ()
supplyCredits TableConfig
conf NominalCredits
deposit Levels m h
levels =
Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
forall (m :: * -> *) h.
Monad m =>
Levels m h -> (LevelNo -> Level m h -> m ()) -> m ()
iforLevelM_ Levels m h
levels ((LevelNo -> Level m h -> m ()) -> m ())
-> (LevelNo -> Level m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \LevelNo
ln (Level IncomingRun m h
ir Vector (Ref (Run m h))
_rs) ->
TableConfig -> LevelNo -> IncomingRun m h -> NominalCredits -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> LevelNo -> IncomingRun m h -> NominalCredits -> m ()
supplyCreditsIncomingRun TableConfig
conf LevelNo
ln IncomingRun m h
ir NominalCredits
deposit
maxMergeDebt :: TableConfig -> MergePolicyForLevel -> LevelNo -> MergeDebt
maxMergeDebt :: TableConfig -> MergePolicyForLevel -> LevelNo -> MergeDebt
maxMergeDebt TableConfig {
confWriteBufferAlloc :: TableConfig -> WriteBufferAlloc
confWriteBufferAlloc = AllocNumEntries NumEntries
bufferSize,
SizeRatio
confSizeRatio :: TableConfig -> SizeRatio
confSizeRatio :: SizeRatio
confSizeRatio
} MergePolicyForLevel
mergePolicy LevelNo
ln =
let !sizeRatio :: Int
sizeRatio = SizeRatio -> Int
sizeRatioInt SizeRatio
confSizeRatio in
case MergePolicyForLevel
mergePolicy of
MergePolicyForLevel
LevelLevelling ->
MergeCredits -> MergeDebt
MergeDebt (MergeCredits -> MergeDebt)
-> (Int -> MergeCredits) -> Int -> MergeDebt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> MergeCredits
MergeCredits (Int -> MergeDebt) -> Int -> MergeDebt
forall a b. (a -> b) -> a -> b
$
Int
sizeRatio Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering Int
sizeRatio NumEntries
bufferSize (LevelNo -> LevelNo
forall a. Enum a => a -> a
pred LevelNo
ln)
Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int -> NumEntries -> LevelNo -> Int
maxRunSizeLevelling Int
sizeRatio NumEntries
bufferSize LevelNo
ln
MergePolicyForLevel
LevelTiering ->
MergeCredits -> MergeDebt
MergeDebt (MergeCredits -> MergeDebt)
-> (Int -> MergeCredits) -> Int -> MergeDebt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> MergeCredits
MergeCredits (Int -> MergeDebt) -> Int -> MergeDebt
forall a b. (a -> b) -> a -> b
$
Int
maxRuns Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering Int
sizeRatio NumEntries
bufferSize (LevelNo -> LevelNo
forall a. Enum a => a -> a
pred LevelNo
ln)
where
maxRuns :: Int
maxRuns = Int
sizeRatio Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
nominalDebtForLevel :: TableConfig -> LevelNo -> NominalDebt
nominalDebtForLevel :: TableConfig -> LevelNo -> NominalDebt
nominalDebtForLevel TableConfig {
confWriteBufferAlloc :: TableConfig -> WriteBufferAlloc
confWriteBufferAlloc = AllocNumEntries !NumEntries
bufferSize,
SizeRatio
confSizeRatio :: TableConfig -> SizeRatio
confSizeRatio :: SizeRatio
confSizeRatio
} LevelNo
ln =
Int -> NominalDebt
NominalDebt (Int -> NumEntries -> LevelNo -> Int
maxRunSizeTiering (SizeRatio -> Int
sizeRatioInt SizeRatio
confSizeRatio) NumEntries
bufferSize LevelNo
ln)