Safe Haskell | Safe-Inferred |
---|---|
Language | GHC2021 |
Database.LSMTree.Internal.MergeSchedule
Synopsis
- data AtLevel a = AtLevel LevelNo a
- data MergeTrace
- = TraceFlushWriteBuffer NumEntries RunNumber RunParams
- | TraceAddLevel
- | TraceAddRun RunNumber (Vector RunNumber)
- | TraceNewMerge (Vector NumEntries) RunNumber RunParams MergePolicyForLevel LevelMergeType
- | TraceNewMergeSingleRun NumEntries RunNumber
- | TraceCompletedMerge NumEntries RunNumber
- | TraceExpectCompletedMerge RunNumber
- data TableContent m h = TableContent {
- tableWriteBuffer :: !WriteBuffer
- tableWriteBufferBlobs :: !(Ref (WriteBufferBlobs m h))
- tableLevels :: !(Levels m h)
- tableCache :: !(LevelsCache m h)
- tableUnionLevel :: !(UnionLevel m h)
- duplicateTableContent :: (PrimMonad m, MonadMask m) => ActionRegistry m -> TableContent m h -> m (TableContent m h)
- releaseTableContent :: (PrimMonad m, MonadMask m) => ActionRegistry m -> TableContent m h -> m ()
- data LevelsCache m h = LevelsCache_ {
- cachedRuns :: !(Vector (Ref (Run m h)))
- cachedFilters :: !(Vector (Bloom SerialisedKey))
- cachedIndexes :: !(Vector Index)
- cachedKOpsFiles :: !(Vector (Handle h))
- mkLevelsCache :: forall m h. (PrimMonad m, MonadMVar m, MonadMask m) => ActionRegistry m -> Levels m h -> m (LevelsCache m h)
- type Levels m h = Vector (Level m h)
- data Level m h = Level {
- incomingRun :: !(IncomingRun m h)
- residentRuns :: !(Vector (Ref (Run m h)))
- data MergePolicyForLevel
- mergingRunParamsForLevel :: ActiveDir -> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
- data UnionLevel m h
- = NoUnion
- | Union !(Ref (MergingTree m h))
- updatesWithInterleavedFlushes :: forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h -> SessionRoot -> UniqCounter m -> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> ActionRegistry m -> TableContent m h -> m (TableContent m h)
- flushWriteBuffer :: (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h -> SessionRoot -> UniqCounter m -> ActionRegistry m -> TableContent m h -> m (TableContent m h)
- maxRunSize :: SizeRatio -> WriteBufferAlloc -> MergePolicyForLevel -> LevelNo -> NumEntries
- newtype MergeDebt = MergeDebt MergeCredits
- newtype MergeCredits = MergeCredits Int
- supplyCredits :: (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) => TableConfig -> NominalCredits -> Levels m h -> m ()
- newtype NominalDebt = NominalDebt Int
- newtype NominalCredits = NominalCredits Int
- nominalDebtAsCredits :: NominalDebt -> NominalCredits
- nominalDebtForLevel :: TableConfig -> LevelNo -> NominalDebt
- addWriteBufferEntries :: (MonadSTM m, MonadThrow m, PrimMonad m) => HasFS m h -> ResolveSerialisedValue -> Ref (WriteBufferBlobs m h) -> NumEntries -> WriteBuffer -> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> m (WriteBuffer, Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
Traces
data MergeTrace Source #
Constructors
TraceFlushWriteBuffer | |
Fields
| |
TraceAddLevel | |
TraceAddRun | |
TraceNewMerge | |
Fields
| |
TraceNewMergeSingleRun | |
Fields
| |
TraceCompletedMerge | |
Fields
| |
TraceExpectCompletedMerge RunNumber | This is traced at the latest point the merge could complete. |
Instances
Show MergeTrace Source # | |
Defined in Database.LSMTree.Internal.MergeSchedule Methods showsPrec :: Int -> MergeTrace -> ShowS # show :: MergeTrace -> String # showList :: [MergeTrace] -> ShowS # |
Table content
data TableContent m h Source #
The levels of the table, from most to least recently inserted.
Concurrency: read-only operations are allowed to be concurrent with each
other, but update operations must not be concurrent with each other or read
operations. For example, inspecting the levels cache can be done
concurrently, but updatesWithInterleavedFlushes
must be serialised.
Constructors
TableContent | |
Fields
|
duplicateTableContent :: (PrimMonad m, MonadMask m) => ActionRegistry m -> TableContent m h -> m (TableContent m h) Source #
releaseTableContent :: (PrimMonad m, MonadMask m) => ActionRegistry m -> TableContent m h -> m () Source #
Levels cache
data LevelsCache m h Source #
Flattend cache of the runs that referenced by a table.
This cache includes a vector of runs, but also vectors of the runs broken down into components, like bloom filters, fence pointer indexes and file handles. This allows for quick access in the lookup code. Recomputing this cache should be relatively rare.
Caches keep references to its runs on construction, and they release each
reference when the cache is invalidated. This is done so that incremental
merges can remove references for their input runs when a merge completes,
without closing runs that might be in use for other operations such as
lookups. This does mean that a cache can keep runs open for longer than
necessary, so caches should be rebuilt using, e.g., rebuildCache
, in a
timely manner.
Constructors
LevelsCache_ | |
Fields
|
mkLevelsCache :: forall m h. (PrimMonad m, MonadMVar m, MonadMask m) => ActionRegistry m -> Levels m h -> m (LevelsCache m h) Source #
Flatten the argument Level
s into a single vector of runs, including all
runs that are inputs to an ongoing merge. Use that to populate the
LevelsCache
. The cache will take a reference for each of its runs.
Levels, runs and ongoing merges
A level is a sequence of resident runs at this level, prefixed by an incoming run, which is usually multiple runs that are being merged. Once completed, the resulting run will become a resident run at this level.
Constructors
Level | |
Fields
|
data MergePolicyForLevel Source #
Constructors
LevelTiering | |
LevelLevelling |
Instances
Show MergePolicyForLevel Source # | |
Defined in Database.LSMTree.Internal.IncomingRun Methods showsPrec :: Int -> MergePolicyForLevel -> ShowS # show :: MergePolicyForLevel -> String # showList :: [MergePolicyForLevel] -> ShowS # | |
NFData MergePolicyForLevel Source # | |
Defined in Database.LSMTree.Internal.IncomingRun Methods rnf :: MergePolicyForLevel -> () # | |
Eq MergePolicyForLevel Source # | |
Defined in Database.LSMTree.Internal.IncomingRun Methods (==) :: MergePolicyForLevel -> MergePolicyForLevel -> Bool # (/=) :: MergePolicyForLevel -> MergePolicyForLevel -> Bool # | |
DecodeVersioned MergePolicyForLevel Source # | |
Defined in Database.LSMTree.Internal.Snapshot.Codec Methods decodeVersioned :: SnapshotVersion -> Decoder s MergePolicyForLevel Source # | |
Encode MergePolicyForLevel Source # | |
Defined in Database.LSMTree.Internal.Snapshot.Codec Methods encode :: MergePolicyForLevel -> Encoding Source # |
mergingRunParamsForLevel :: ActiveDir -> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths) Source #
Union level
data UnionLevel m h Source #
An additional optional last level, created as a result of
union
. It can not only contain an ongoing merge
of multiple runs, but a nested tree of merges.
TODO: So far, this is * not considered when creating cursors (also used for range lookups) * never merged into the regular levels
Constructors
NoUnion | |
Union !(Ref (MergingTree m h)) |
Flushes and scheduled merges
updatesWithInterleavedFlushes :: forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h -> SessionRoot -> UniqCounter m -> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> ActionRegistry m -> TableContent m h -> m (TableContent m h) Source #
A single batch of updates can fill up the write buffer multiple times. We flush the write buffer each time it fills up before trying to fill it up again.
TODO: in practice the size of a batch will be much smaller than the maximum size of the write buffer, so we should optimise for the case that small batches are inserted. Ideas:
- we can allow a range of sizes to flush to disk rather than just the max size
- could do a map bulk merge rather than sequential insert, on the prefix of the batch that's guaranteed to fit
- or flush the existing buffer if we would expect the next batch to cause the buffer to become too large
TODO: we could also optimise for the case where the write buffer is small compared to the size of the batch, but it is less critical. In particular, in case the write buffer is empty, or if it fills up multiple times for a single batch of updates, we might be able to skip adding entries to the write buffer for a large part. When the write buffer is empty, we can sort and deduplicate the vector of updates directly, slice it up into properly sized sub-vectors, and write those to disk. Of course, any remainder that did not fit into a whole run should then end up in a fresh write buffer.
flushWriteBuffer :: (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h -> SessionRoot -> UniqCounter m -> ActionRegistry m -> TableContent m h -> m (TableContent m h) Source #
Flush the write buffer to disk, regardless of whether it is full or not.
The returned table content contains an updated set of levels, where the write buffer is inserted into level 1.
Exported for cabal-docspec
maxRunSize :: SizeRatio -> WriteBufferAlloc -> MergePolicyForLevel -> LevelNo -> NumEntries Source #
Compute the maximum size of a run for a given level.
The size
of a tiering run at each level is allowed to be
bufferSize*sizeRatio^(level-1) < size <= bufferSize*sizeRatio^level
.
>>>
unNumEntries . maxRunSize Four (AllocNumEntries (NumEntries 2)) LevelTiering . LevelNo <$> [0, 1, 2, 3, 4]
[0,2,8,32,128]
The size
of a levelling run at each level is allowed to be
bufferSize*sizeRatio^(level-1) < size <= bufferSize*sizeRatio^(level+1)
. A
levelling run can take take up a whole level, so the maximum size of a run is
sizeRatio*
larger than the maximum size of a tiering run on the same level.
>>>
unNumEntries . maxRunSize Four (AllocNumEntries (NumEntries 2)) LevelLevelling . LevelNo <$> [0, 1, 2, 3, 4]
[0,8,32,128,512]
Credits
Constructors
MergeDebt MergeCredits |
Instances
NFData MergeDebt Source # | |
Defined in Database.LSMTree.Internal.MergingRun | |
Eq MergeDebt Source # | |
Ord MergeDebt Source # | |
Defined in Database.LSMTree.Internal.MergingRun | |
DecodeVersioned MergeDebt Source # | |
Defined in Database.LSMTree.Internal.Snapshot.Codec Methods decodeVersioned :: SnapshotVersion -> Decoder s MergeDebt Source # | |
Encode MergeDebt Source # | |
newtype MergeCredits Source #
Constructors
MergeCredits Int |
Instances
supplyCredits :: (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) => TableConfig -> NominalCredits -> Levels m h -> m () Source #
Supply the given amount of credits to each merge in the levels structure. This may cause some merges to progress.
newtype NominalDebt Source #
Total merge debt to complete the merge in an incoming run.
This corresponds to the number (worst case, minimum number) of update operations inserted into the table, before we will expect the merge to complete.
Constructors
NominalDebt Int |
Instances
NFData NominalDebt Source # | |
Defined in Database.LSMTree.Internal.IncomingRun Methods rnf :: NominalDebt -> () # | |
Eq NominalDebt Source # | |
Defined in Database.LSMTree.Internal.IncomingRun | |
DecodeVersioned NominalDebt Source # | |
Defined in Database.LSMTree.Internal.Snapshot.Codec Methods decodeVersioned :: SnapshotVersion -> Decoder s NominalDebt Source # | |
Encode NominalDebt Source # | |
Defined in Database.LSMTree.Internal.Snapshot.Codec Methods encode :: NominalDebt -> Encoding Source # |
newtype NominalCredits Source #
Merge credits that get supplied to a table's levels.
This corresponds to the number of update operations inserted into the table.
Constructors
NominalCredits Int |
Instances
nominalDebtForLevel :: TableConfig -> LevelNo -> NominalDebt Source #
The nominal debt equals the minimum of credits we will supply before we expect the merge to complete. This is the same as the number of updates in a run that gets moved to this level.
Exported for testing
addWriteBufferEntries :: (MonadSTM m, MonadThrow m, PrimMonad m) => HasFS m h -> ResolveSerialisedValue -> Ref (WriteBufferBlobs m h) -> NumEntries -> WriteBuffer -> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> m (WriteBuffer, Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)) Source #
Add entries to the write buffer up until a certain write buffer size n
.
NOTE: if the write buffer is larger n
already, this is a no-op.