| Safe Haskell | Safe-Inferred | 
|---|---|
| Language | GHC2021 | 
Database.LSMTree.Internal.MergeSchedule
Synopsis
- data AtLevel a = AtLevel LevelNo a
 - data MergeTrace
- = TraceFlushWriteBuffer NumEntries RunNumber RunParams
 - | TraceAddLevel
 - | TraceAddRun RunNumber (Vector RunNumber)
 - | TraceNewMerge (Vector NumEntries) RunNumber RunParams MergePolicyForLevel LevelMergeType
 - | TraceNewMergeSingleRun NumEntries RunNumber
 - | TraceCompletedMerge NumEntries RunNumber
 - | TraceExpectCompletedMerge RunNumber
 
 - data TableContent m h = TableContent {
- tableWriteBuffer :: !WriteBuffer
 - tableWriteBufferBlobs :: !(Ref (WriteBufferBlobs m h))
 - tableLevels :: !(Levels m h)
 - tableCache :: !(LevelsCache m h)
 - tableUnionLevel :: !(UnionLevel m h)
 
 - duplicateTableContent :: (PrimMonad m, MonadMask m) => ActionRegistry m -> TableContent m h -> m (TableContent m h)
 - releaseTableContent :: (PrimMonad m, MonadMask m) => ActionRegistry m -> TableContent m h -> m ()
 - data LevelsCache m h = LevelsCache_ {
- cachedRuns :: !(Vector (Ref (Run m h)))
 - cachedFilters :: !(Vector (Bloom SerialisedKey))
 - cachedIndexes :: !(Vector Index)
 - cachedKOpsFiles :: !(Vector (Handle h))
 
 - mkLevelsCache :: forall m h. (PrimMonad m, MonadMVar m, MonadMask m) => ActionRegistry m -> Levels m h -> m (LevelsCache m h)
 - type Levels m h = Vector (Level m h)
 - data Level m h = Level {
- incomingRun :: !(IncomingRun m h)
 - residentRuns :: !(Vector (Ref (Run m h)))
 
 - data MergePolicyForLevel
 - mergingRunParamsForLevel :: ActiveDir -> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
 - data UnionLevel m h
- = NoUnion
 - | Union !(Ref (MergingTree m h)) !(UnionCache m h)
 
 - newtype UnionCache m h = UnionCache {
- cachedTree :: LookupTree (Vector (Ref (Run m h)))
 
 - mkUnionCache :: (PrimMonad m, MonadMVar m, MonadMask m) => ActionRegistry m -> Ref (MergingTree m h) -> m (UnionCache m h)
 - duplicateUnionCache :: (PrimMonad m, MonadMask m) => ActionRegistry m -> UnionCache m h -> m (UnionCache m h)
 - releaseUnionCache :: (PrimMonad m, MonadMask m) => ActionRegistry m -> UnionCache m h -> m ()
 - updatesWithInterleavedFlushes :: forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h -> RefCtx -> SessionRoot -> Salt -> UniqCounter m -> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> ActionRegistry m -> TableContent m h -> m (TableContent m h)
 - flushWriteBuffer :: (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h -> RefCtx -> SessionRoot -> Salt -> UniqCounter m -> ActionRegistry m -> TableContent m h -> m (TableContent m h)
 - maxRunSize :: SizeRatio -> WriteBufferAlloc -> MergePolicyForLevel -> LevelNo -> NumEntries
 - newtype MergeDebt = MergeDebt MergeCredits
 - newtype MergeCredits = MergeCredits Int
 - supplyCredits :: (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) => RefCtx -> TableConfig -> NominalCredits -> Levels m h -> m ()
 - newtype NominalDebt = NominalDebt Int
 - newtype NominalCredits = NominalCredits Int
 - nominalDebtAsCredits :: NominalDebt -> NominalCredits
 - nominalDebtForLevel :: TableConfig -> LevelNo -> NominalDebt
 - addWriteBufferEntries :: (MonadSTM m, MonadThrow m, PrimMonad m) => HasFS m h -> ResolveSerialisedValue -> Ref (WriteBufferBlobs m h) -> NumEntries -> WriteBuffer -> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> m (WriteBuffer, Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob))
 
Traces
data MergeTrace Source #
Constructors
| TraceFlushWriteBuffer | |
Fields 
  | |
| TraceAddLevel | |
| TraceAddRun | |
| TraceNewMerge | |
Fields 
  | |
| TraceNewMergeSingleRun | |
Fields 
  | |
| TraceCompletedMerge | |
Fields 
  | |
| TraceExpectCompletedMerge RunNumber | This is traced at the latest point the merge could complete.  | 
Instances
| Show MergeTrace Source # | |
Defined in Database.LSMTree.Internal.MergeSchedule Methods showsPrec :: Int -> MergeTrace -> ShowS # show :: MergeTrace -> String # showList :: [MergeTrace] -> ShowS #  | |
| Eq MergeTrace Source # | |
Defined in Database.LSMTree.Internal.MergeSchedule  | |
Table content
data TableContent m h Source #
The levels of the table, from most to least recently inserted.
Concurrency: read-only operations are allowed to be concurrent with each
 other, but update operations must not be concurrent with each other or read
 operations. For example, inspecting the levels cache can be done
 concurrently, but updatesWithInterleavedFlushes must be serialised.
Constructors
| TableContent | |
Fields 
  | |
duplicateTableContent :: (PrimMonad m, MonadMask m) => ActionRegistry m -> TableContent m h -> m (TableContent m h) Source #
releaseTableContent :: (PrimMonad m, MonadMask m) => ActionRegistry m -> TableContent m h -> m () Source #
Levels cache
data LevelsCache m h Source #
Flattened cache of the runs that referenced by a table.
This cache includes a vector of runs, but also vectors of the runs broken down into components, like bloom filters, fence pointer indexes and file handles. This allows for quick access in the lookup code. Recomputing this cache should be relatively rare.
Caches keep references to its runs on construction, and they release each
 reference when the cache is invalidated. This is done so that incremental
 merges can remove references for their input runs when a merge completes,
 without closing runs that might be in use for other operations such as
 lookups. This does mean that a cache can keep runs open for longer than
 necessary, so caches should be rebuilt using, e.g., rebuildCache, in a
 timely manner.
Constructors
| LevelsCache_ | |
Fields 
  | |
mkLevelsCache :: forall m h. (PrimMonad m, MonadMVar m, MonadMask m) => ActionRegistry m -> Levels m h -> m (LevelsCache m h) Source #
Flatten the argument Levels into a single vector of runs, including all
 runs that are inputs to an ongoing merge. Use that to populate the
 LevelsCache. The cache will take a reference for each of its runs.
Levels, runs and ongoing merges
A level is a sequence of resident runs at this level, prefixed by an incoming run, which is usually multiple runs that are being merged. Once completed, the resulting run will become a resident run at this level.
Constructors
| Level | |
Fields 
  | |
data MergePolicyForLevel Source #
Constructors
| LevelTiering | |
| LevelLevelling | 
Instances
| Show MergePolicyForLevel Source # | |
Defined in Database.LSMTree.Internal.IncomingRun Methods showsPrec :: Int -> MergePolicyForLevel -> ShowS # show :: MergePolicyForLevel -> String # showList :: [MergePolicyForLevel] -> ShowS #  | |
| NFData MergePolicyForLevel Source # | |
Defined in Database.LSMTree.Internal.IncomingRun Methods rnf :: MergePolicyForLevel -> () #  | |
| Eq MergePolicyForLevel Source # | |
Defined in Database.LSMTree.Internal.IncomingRun Methods (==) :: MergePolicyForLevel -> MergePolicyForLevel -> Bool # (/=) :: MergePolicyForLevel -> MergePolicyForLevel -> Bool #  | |
| DecodeVersioned MergePolicyForLevel Source # | |
Defined in Database.LSMTree.Internal.Snapshot.Codec Methods decodeVersioned :: SnapshotVersion -> Decoder s MergePolicyForLevel Source #  | |
| Encode MergePolicyForLevel Source # | |
Defined in Database.LSMTree.Internal.Snapshot.Codec Methods encode :: MergePolicyForLevel -> Encoding Source #  | |
mergingRunParamsForLevel :: ActiveDir -> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths) Source #
Union level
data UnionLevel m h Source #
An additional optional last level, created as a result of
 union. It can not only contain an ongoing merge of
 multiple runs, but a nested tree of merges.
TODO: So far, this is * not considered when creating cursors (also used for range lookups) * never merged into the regular levels
Constructors
| NoUnion | |
| Union !(Ref (MergingTree m h)) !(UnionCache m h) | 
Union cache
newtype UnionCache m h Source #
Similar to the LevelsCache, but in a tree shape, since this structure is
 required to combine the individual lookup results.
Constructors
| UnionCache | |
Fields 
  | |
mkUnionCache :: (PrimMonad m, MonadMVar m, MonadMask m) => ActionRegistry m -> Ref (MergingTree m h) -> m (UnionCache m h) Source #
duplicateUnionCache :: (PrimMonad m, MonadMask m) => ActionRegistry m -> UnionCache m h -> m (UnionCache m h) Source #
releaseUnionCache :: (PrimMonad m, MonadMask m) => ActionRegistry m -> UnionCache m h -> m () Source #
Flushes and scheduled merges
updatesWithInterleavedFlushes :: forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h -> RefCtx -> SessionRoot -> Salt -> UniqCounter m -> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> ActionRegistry m -> TableContent m h -> m (TableContent m h) Source #
A single batch of updates can fill up the write buffer multiple times. We flush the write buffer each time it fills up before trying to fill it up again.
TODO: in practice the size of a batch will be much smaller than the maximum size of the write buffer, so we should optimise for the case that small batches are inserted. Ideas:
- we can allow a range of sizes to flush to disk rather than just the max size
 - could do a map bulk merge rather than sequential insert, on the prefix of the batch that's guaranteed to fit
 - or flush the existing buffer if we would expect the next batch to cause the buffer to become too large
 
TODO: we could also optimise for the case where the write buffer is small compared to the size of the batch, but it is less critical. In particular, in case the write buffer is empty, or if it fills up multiple times for a single batch of updates, we might be able to skip adding entries to the write buffer for a large part. When the write buffer is empty, we can sort and deduplicate the vector of updates directly, slice it up into properly sized sub-vectors, and write those to disk. Of course, any remainder that did not fit into a whole run should then end up in a fresh write buffer.
flushWriteBuffer :: (MonadMask m, MonadMVar m, MonadST m, MonadSTM m) => Tracer m (AtLevel MergeTrace) -> TableConfig -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h -> RefCtx -> SessionRoot -> Salt -> UniqCounter m -> ActionRegistry m -> TableContent m h -> m (TableContent m h) Source #
Flush the write buffer to disk, regardless of whether it is full or not.
The returned table content contains an updated set of levels, where the write buffer is inserted into level 1.
Exported for cabal-docspec
maxRunSize :: SizeRatio -> WriteBufferAlloc -> MergePolicyForLevel -> LevelNo -> NumEntries Source #
Compute the maximum size of a run for a given level.
The size of a tiering run at each level is allowed to be
 bufferSize*sizeRatio^(level-1) < size <= bufferSize*sizeRatio^level.
>>>unNumEntries . maxRunSize Four (AllocNumEntries 2) LevelTiering . LevelNo <$> [0, 1, 2, 3, 4][0,2,8,32,128]
The size of a levelling run at each level is allowed to be
 bufferSize*sizeRatio^(level-1) < size <= bufferSize*sizeRatio^(level+1). A
 levelling run can take take up a whole level, so the maximum size of a run is
 sizeRatio* larger than the maximum size of a tiering run on the same level.
>>>unNumEntries . maxRunSize Four (AllocNumEntries 2) LevelLevelling . LevelNo <$> [0, 1, 2, 3, 4][0,8,32,128,512]
Credits
Constructors
| MergeDebt MergeCredits | 
Instances
| NFData MergeDebt Source # | |
Defined in Database.LSMTree.Internal.MergingRun  | |
| Eq MergeDebt Source # | |
| Ord MergeDebt Source # | |
Defined in Database.LSMTree.Internal.MergingRun  | |
| DecodeVersioned MergeDebt Source # | |
Defined in Database.LSMTree.Internal.Snapshot.Codec Methods decodeVersioned :: SnapshotVersion -> Decoder s MergeDebt Source #  | |
| Encode MergeDebt Source # | |
newtype MergeCredits Source #
Constructors
| MergeCredits Int | 
Instances
supplyCredits :: (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) => RefCtx -> TableConfig -> NominalCredits -> Levels m h -> m () Source #
Supply the given amount of credits to each merge in the levels structure. This may cause some merges to progress.
newtype NominalDebt Source #
Total merge debt to complete the merge in an incoming run.
This corresponds to the number (worst case, minimum number) of update operations inserted into the table, before we will expect the merge to complete.
Constructors
| NominalDebt Int | 
Instances
| NFData NominalDebt Source # | |
Defined in Database.LSMTree.Internal.IncomingRun Methods rnf :: NominalDebt -> () #  | |
| Eq NominalDebt Source # | |
Defined in Database.LSMTree.Internal.IncomingRun  | |
| DecodeVersioned NominalDebt Source # | |
Defined in Database.LSMTree.Internal.Snapshot.Codec Methods decodeVersioned :: SnapshotVersion -> Decoder s NominalDebt Source #  | |
| Encode NominalDebt Source # | |
Defined in Database.LSMTree.Internal.Snapshot.Codec Methods encode :: NominalDebt -> Encoding Source #  | |
newtype NominalCredits Source #
Merge credits that get supplied to a table's levels.
This corresponds to the number of update operations inserted into the table.
Constructors
| NominalCredits Int | 
Instances
nominalDebtForLevel :: TableConfig -> LevelNo -> NominalDebt Source #
The nominal debt equals the minimum of credits we will supply before we expect the merge to complete. This is the same as the number of updates in a run that gets moved to this level.
Exported for testing
addWriteBufferEntries :: (MonadSTM m, MonadThrow m, PrimMonad m) => HasFS m h -> ResolveSerialisedValue -> Ref (WriteBufferBlobs m h) -> NumEntries -> WriteBuffer -> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> m (WriteBuffer, Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)) Source #
Add entries to the write buffer up until a certain write buffer size n.
NOTE: if the write buffer is larger n already, this is a no-op.