{-# LANGUAGE PatternSynonyms #-}

-- | A prototype of an LSM with explicitly scheduled incremental merges.
--
-- The scheduled incremental merges is about ensuring that the merging
-- work (CPU and I\/O) can be spread out over time evenly. This also means
-- the LSM update operations have worst case complexity rather than amortised
-- complexity, because they do a fixed amount of merging work each.
--
-- Another thing this prototype demonstrates is a design for duplicating tables
-- and sharing ongoing incremental merges.
--
-- Finally, it demonstrates a design for table unions, including a
-- representation for in-progress merging trees.
--
-- The merging policy that this prototype uses is power 4 \"lazy levelling\".
-- Power 4 means each level is 4 times bigger than the previous level.
-- Lazy levelling means we use tiering for every level except the last level
-- which uses levelling. Though note that the first level always uses tiering,
-- even if the first level is also the last level. This is to simplify flushing
-- the write buffer: if we used levelling on the first level we would need a
-- code path for merging the write buffer into the first level.
--
module ScheduledMerges (
    -- * Main API
    LSM,
    Key (K), Value (V), resolveValue, Blob (B),
    new,
    LookupResult (..),
    lookup, lookups,
    Op,
    Update (..),
    update, updates,
    insert, inserts,
    delete, deletes,
    mupsert, mupserts,
    supplyMergeCredits,
    duplicate,
    unions,
    Credit,
    Debt,
    remainingUnionDebt,
    supplyUnionCredits,

    -- * Test and trace
    MTree (..),
    logicalValue,
    Representation,
    dumpRepresentation,
    representationShape,
    Event,
    EventAt(..),
    EventDetail(..),
    MergingTree(..),
    MergingTreeState(..),
    PendingMerge(..),
    PreExistingRun(..),
    MergingRun(..),
    MergingRunState(..),
    MergePolicy(..),
    IsMergeType(..),
    TreeMergeType(..),
    LevelMergeType(..),
    MergeCredit(..),
    MergeDebt(..),
    NominalCredit(..),
    NominalDebt(..),
    Run,
    runSize,
    UnionCredits (..),
    supplyCreditsMergingTree,
    UnionDebt(..),
    remainingDebtMergingTree,
    mergek,
    mergeBatchSize,

    -- * Invariants
    Invariant,
    evalInvariant,
    treeInvariant,
    mergeDebtInvariant,
  ) where

import           Prelude hiding (lookup)

import           Data.Bits
import           Data.Foldable (for_, toList, traverse_)
import           Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import           Data.Maybe (catMaybes)
import           Data.STRef

import qualified Control.Exception as Exc (assert)
import           Control.Monad (foldM, forM, when)
import           Control.Monad.ST
import qualified Control.Monad.Trans.Except as E
import           Control.Tracer (Tracer, contramap, traceWith)
import           GHC.Stack (HasCallStack, callStack)

import qualified Test.QuickCheck as QC

data LSM s  = LSMHandle !(STRef s Counter)
                        !(STRef s (LSMContent s))

-- | A simple count of LSM operations to allow logging the operation
-- number in each event. This enables relating merge events to the
-- operation number (which is interesting for numerical representations
-- like this). We would not need this in the real implementation.
type Counter = Int

-- | The levels of the table, from most to least recently inserted.
data LSMContent s =
    LSMContent
      Buffer          -- ^ write buffer is level 0 of the table, in-memory
      (Levels s)      -- ^ \"regular\" levels 1+, on disk in real implementation
      (UnionLevel s)  -- ^ a potential last level

type Levels s = [Level s]

-- | A level is a sequence of resident runs at this level, prefixed by an
-- incoming run, which is usually multiple runs that are being merged. Once
-- completed, the resulting run will become a resident run at this level.
data Level s = Level !(IncomingRun s) ![Run]

-- | We represent single runs specially, rather than putting them in as a
-- 'CompletedMerge'. This is for two reasons: to see statically that it's a
-- single run without having to read the 'STRef', and secondly to make it easier
-- to avoid supplying merge credits. It's not essential, but simplifies things
-- somewhat.
data IncomingRun s = Merging !MergePolicy
                             !NominalDebt !(STRef s NominalCredit)
                             !(MergingRun LevelMergeType s)
                   | Single  !Run

-- | The merge policy for a LSM level can be either tiering or levelling.
-- In this design we use levelling for the last level, and tiering for
-- all other levels. The first level always uses tiering however, even if
-- it's also the last level. So 'MergePolicy' and 'LevelMergeType' are
-- orthogonal, all combinations are possible.
--
data MergePolicy = MergePolicyTiering | MergePolicyLevelling
  deriving stock (MergePolicy -> MergePolicy -> Bool
(MergePolicy -> MergePolicy -> Bool)
-> (MergePolicy -> MergePolicy -> Bool) -> Eq MergePolicy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MergePolicy -> MergePolicy -> Bool
== :: MergePolicy -> MergePolicy -> Bool
$c/= :: MergePolicy -> MergePolicy -> Bool
/= :: MergePolicy -> MergePolicy -> Bool
Eq, Int -> MergePolicy -> ShowS
[MergePolicy] -> ShowS
MergePolicy -> String
(Int -> MergePolicy -> ShowS)
-> (MergePolicy -> String)
-> ([MergePolicy] -> ShowS)
-> Show MergePolicy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MergePolicy -> ShowS
showsPrec :: Int -> MergePolicy -> ShowS
$cshow :: MergePolicy -> String
show :: MergePolicy -> String
$cshowList :: [MergePolicy] -> ShowS
showList :: [MergePolicy] -> ShowS
Show)

-- | A \"merging run\" is a mutable representation of an incremental merge.
-- It is also a unit of sharing between duplicated tables.
--
data MergingRun t s = MergingRun !t !MergeDebt
                                 !(STRef s MergingRunState)

data MergingRunState = CompletedMerge !Run
                     | OngoingMerge
                         !MergeCredit
                         ![Run]  -- ^ inputs of the merge
                         Run  -- ^ output of the merge (lazily evaluated)

-- | Merges can exist in different parts of the LSM, each with different options
-- for the exact merge operation performed.
class Show t => IsMergeType t where
  isLastLevel :: t -> Bool
  isUnion :: t -> Bool

-- | Different types of merges created as part of a regular (non-union) level.
--
-- A last level merge behaves differently from a mid-level merge: last level
-- merges can actually remove delete operations, whereas mid-level merges must
-- preserve them. This is orthogonal to the 'MergePolicy'.
data LevelMergeType = MergeMidLevel | MergeLastLevel
  deriving stock (LevelMergeType -> LevelMergeType -> Bool
(LevelMergeType -> LevelMergeType -> Bool)
-> (LevelMergeType -> LevelMergeType -> Bool) -> Eq LevelMergeType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: LevelMergeType -> LevelMergeType -> Bool
== :: LevelMergeType -> LevelMergeType -> Bool
$c/= :: LevelMergeType -> LevelMergeType -> Bool
/= :: LevelMergeType -> LevelMergeType -> Bool
Eq, Int -> LevelMergeType -> ShowS
[LevelMergeType] -> ShowS
LevelMergeType -> String
(Int -> LevelMergeType -> ShowS)
-> (LevelMergeType -> String)
-> ([LevelMergeType] -> ShowS)
-> Show LevelMergeType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LevelMergeType -> ShowS
showsPrec :: Int -> LevelMergeType -> ShowS
$cshow :: LevelMergeType -> String
show :: LevelMergeType -> String
$cshowList :: [LevelMergeType] -> ShowS
showList :: [LevelMergeType] -> ShowS
Show)

instance IsMergeType LevelMergeType where
  isLastLevel :: LevelMergeType -> Bool
isLastLevel = \case
      LevelMergeType
MergeMidLevel  -> Bool
False
      LevelMergeType
MergeLastLevel -> Bool
True
  isUnion :: LevelMergeType -> Bool
isUnion = Bool -> LevelMergeType -> Bool
forall a b. a -> b -> a
const Bool
False

-- | Different types of merges created as part of the merging tree.
--
-- Union merges follow the semantics of @Data.Map.unionWith (<>)@. Since
-- the input runs are semantically treated like @Data.Map@s, deletes are ignored
-- and inserts act like mupserts, so they need to be merged monoidally using
-- 'resolveValue'.
--
-- Trees can only exist on the union level, which is the last. Therefore, node
-- merges can always drop deletes.
data TreeMergeType = MergeLevel | MergeUnion
  deriving stock (TreeMergeType -> TreeMergeType -> Bool
(TreeMergeType -> TreeMergeType -> Bool)
-> (TreeMergeType -> TreeMergeType -> Bool) -> Eq TreeMergeType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TreeMergeType -> TreeMergeType -> Bool
== :: TreeMergeType -> TreeMergeType -> Bool
$c/= :: TreeMergeType -> TreeMergeType -> Bool
/= :: TreeMergeType -> TreeMergeType -> Bool
Eq, Int -> TreeMergeType -> ShowS
[TreeMergeType] -> ShowS
TreeMergeType -> String
(Int -> TreeMergeType -> ShowS)
-> (TreeMergeType -> String)
-> ([TreeMergeType] -> ShowS)
-> Show TreeMergeType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TreeMergeType -> ShowS
showsPrec :: Int -> TreeMergeType -> ShowS
$cshow :: TreeMergeType -> String
show :: TreeMergeType -> String
$cshowList :: [TreeMergeType] -> ShowS
showList :: [TreeMergeType] -> ShowS
Show)

instance IsMergeType TreeMergeType where
  isLastLevel :: TreeMergeType -> Bool
isLastLevel = Bool -> TreeMergeType -> Bool
forall a b. a -> b -> a
const Bool
True
  isUnion :: TreeMergeType -> Bool
isUnion = \case
      TreeMergeType
MergeLevel -> Bool
False
      TreeMergeType
MergeUnion -> Bool
True

-- | An additional optional last level, created as a result of 'union'. It can
-- not only contain an ongoing merge of multiple runs, but a nested tree of
-- merges. See Note [Table Unions].
data UnionLevel s = NoUnion
                    -- | We track the debt to make sure it never increases.
                  | Union !(MergingTree s) !(STRef s Debt)

-- | A \"merging tree\" is a mutable representation of an incremental
-- tree-shaped nested merge. This allows to represent union merges of entire
-- tables, each of which itself first need to be merged to become a single run.
--
-- Trees have to support arbitrarily deep nesting, since each input to 'union'
-- might already contain an in-progress merging tree (which then becomes shared
-- between multiple tables).
--
-- See Note [Table Unions].
newtype MergingTree s = MergingTree (STRef s (MergingTreeState s))

data MergingTreeState s = CompletedTreeMerge !Run
                          -- | Reuses MergingRun (with its STRef) to allow
                          -- sharing existing merges.
                        | OngoingTreeMerge !(MergingRun TreeMergeType s)
                        | PendingTreeMerge !(PendingMerge s)

-- | A merge that is waiting for its inputs to complete.
--
-- The inputs can themselves be 'MergingTree's (with its STRef) to allow sharing
-- existing unions.
data PendingMerge s = -- | The inputs are entire content of a table, i.e. its
                      -- (merging) runs and finally a union merge (if that table
                      -- already contained a union).
                      PendingLevelMerge ![PreExistingRun s] !(Maybe (MergingTree s))
                      -- | Each input is a level merge of the entire content of
                      -- a table.
                    | PendingUnionMerge ![MergingTree s]

-- | This is much like an 'IncomingRun', and are created from them, but contain
-- only the essential information needed in a 'PendingLevelMerge'.
data PreExistingRun s = PreExistingRun  !Run
                      | PreExistingMergingRun !(MergingRun LevelMergeType s)

pendingContent :: PendingMerge s
               -> (TreeMergeType, [PreExistingRun s], [MergingTree s])
pendingContent :: forall s.
PendingMerge s
-> (TreeMergeType, [PreExistingRun s], [MergingTree s])
pendingContent = \case
    PendingLevelMerge [PreExistingRun s]
prs Maybe (MergingTree s)
t  -> (TreeMergeType
MergeLevel, [PreExistingRun s]
prs, Maybe (MergingTree s) -> [MergingTree s]
forall a. Maybe a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Maybe (MergingTree s)
t)
    PendingUnionMerge     [MergingTree s]
ts -> (TreeMergeType
MergeUnion, [],  [MergingTree s]
ts)

{-# COMPLETE PendingMerge #-}
pattern PendingMerge :: TreeMergeType
                     -> [PreExistingRun s]
                     -> [MergingTree s]
                     -> PendingMerge s
pattern $mPendingMerge :: forall {r} {s}.
PendingMerge s
-> (TreeMergeType -> [PreExistingRun s] -> [MergingTree s] -> r)
-> ((# #) -> r)
-> r
PendingMerge mt prs ts <- (pendingContent -> (mt, prs, ts))

type Run    = Map Key Op
type Buffer = Map Key Op

bufferToRun :: Buffer -> Run
bufferToRun :: Buffer -> Buffer
bufferToRun = Buffer -> Buffer
forall a. a -> a
id

runSize :: Run -> Int
runSize :: Buffer -> Int
runSize = Buffer -> Int
forall k a. Map k a -> Int
Map.size

bufferSize :: Buffer -> Int
bufferSize :: Buffer -> Int
bufferSize = Buffer -> Int
forall k a. Map k a -> Int
Map.size

type Op = Update Value Blob

newtype Key = K Int
  deriving stock (Key -> Key -> Bool
(Key -> Key -> Bool) -> (Key -> Key -> Bool) -> Eq Key
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Key -> Key -> Bool
== :: Key -> Key -> Bool
$c/= :: Key -> Key -> Bool
/= :: Key -> Key -> Bool
Eq, Eq Key
Eq Key =>
(Key -> Key -> Ordering)
-> (Key -> Key -> Bool)
-> (Key -> Key -> Bool)
-> (Key -> Key -> Bool)
-> (Key -> Key -> Bool)
-> (Key -> Key -> Key)
-> (Key -> Key -> Key)
-> Ord Key
Key -> Key -> Bool
Key -> Key -> Ordering
Key -> Key -> Key
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Key -> Key -> Ordering
compare :: Key -> Key -> Ordering
$c< :: Key -> Key -> Bool
< :: Key -> Key -> Bool
$c<= :: Key -> Key -> Bool
<= :: Key -> Key -> Bool
$c> :: Key -> Key -> Bool
> :: Key -> Key -> Bool
$c>= :: Key -> Key -> Bool
>= :: Key -> Key -> Bool
$cmax :: Key -> Key -> Key
max :: Key -> Key -> Key
$cmin :: Key -> Key -> Key
min :: Key -> Key -> Key
Ord, Int -> Key -> ShowS
[Key] -> ShowS
Key -> String
(Int -> Key -> ShowS)
-> (Key -> String) -> ([Key] -> ShowS) -> Show Key
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Key -> ShowS
showsPrec :: Int -> Key -> ShowS
$cshow :: Key -> String
show :: Key -> String
$cshowList :: [Key] -> ShowS
showList :: [Key] -> ShowS
Show)
  deriving newtype Int -> Key
Key -> Int
Key -> [Key]
Key -> Key
Key -> Key -> [Key]
Key -> Key -> Key -> [Key]
(Key -> Key)
-> (Key -> Key)
-> (Int -> Key)
-> (Key -> Int)
-> (Key -> [Key])
-> (Key -> Key -> [Key])
-> (Key -> Key -> [Key])
-> (Key -> Key -> Key -> [Key])
-> Enum Key
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: Key -> Key
succ :: Key -> Key
$cpred :: Key -> Key
pred :: Key -> Key
$ctoEnum :: Int -> Key
toEnum :: Int -> Key
$cfromEnum :: Key -> Int
fromEnum :: Key -> Int
$cenumFrom :: Key -> [Key]
enumFrom :: Key -> [Key]
$cenumFromThen :: Key -> Key -> [Key]
enumFromThen :: Key -> Key -> [Key]
$cenumFromTo :: Key -> Key -> [Key]
enumFromTo :: Key -> Key -> [Key]
$cenumFromThenTo :: Key -> Key -> Key -> [Key]
enumFromThenTo :: Key -> Key -> Key -> [Key]
Enum

newtype Value  = V Int
  deriving stock (Value -> Value -> Bool
(Value -> Value -> Bool) -> (Value -> Value -> Bool) -> Eq Value
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Value -> Value -> Bool
== :: Value -> Value -> Bool
$c/= :: Value -> Value -> Bool
/= :: Value -> Value -> Bool
Eq, Int -> Value -> ShowS
[Value] -> ShowS
Value -> String
(Int -> Value -> ShowS)
-> (Value -> String) -> ([Value] -> ShowS) -> Show Value
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Value -> ShowS
showsPrec :: Int -> Value -> ShowS
$cshow :: Value -> String
show :: Value -> String
$cshowList :: [Value] -> ShowS
showList :: [Value] -> ShowS
Show)

resolveValue :: Value -> Value -> Value
resolveValue :: Value -> Value -> Value
resolveValue (V Int
x) (V Int
y) = Int -> Value
V (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
y)

newtype Blob = B Int
  deriving stock (Blob -> Blob -> Bool
(Blob -> Blob -> Bool) -> (Blob -> Blob -> Bool) -> Eq Blob
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Blob -> Blob -> Bool
== :: Blob -> Blob -> Bool
$c/= :: Blob -> Blob -> Bool
/= :: Blob -> Blob -> Bool
Eq, Int -> Blob -> ShowS
[Blob] -> ShowS
Blob -> String
(Int -> Blob -> ShowS)
-> (Blob -> String) -> ([Blob] -> ShowS) -> Show Blob
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Blob -> ShowS
showsPrec :: Int -> Blob -> ShowS
$cshow :: Blob -> String
show :: Blob -> String
$cshowList :: [Blob] -> ShowS
showList :: [Blob] -> ShowS
Show)

-- | The size of the 4 tiering runs at each level are allowed to be:
-- @4^(level-1) < size <= 4^level@
--
tieringRunSize :: Int -> Int
tieringRunSize :: Int -> Int
tieringRunSize Int
n = Int
4Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^Int
n

-- | Levelling runs take up the whole level, so are 4x larger.
--
levellingRunSize :: Int -> Int
levellingRunSize :: Int -> Int
levellingRunSize Int
n = Int
4Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)

tieringRunSizeToLevel :: Run -> Int
tieringRunSizeToLevel :: Buffer -> Int
tieringRunSizeToLevel Buffer
r
  | Int
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
maxBufferSize = Int
1  -- level numbers start at 1
  | Bool
otherwise =
    Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int -> Int
forall b. FiniteBits b => b -> Int
finiteBitSize Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int -> Int
forall b. FiniteBits b => b -> Int
countLeadingZeros (Int
sInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1) Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2
  where
    s :: Int
s = Buffer -> Int
runSize Buffer
r

levellingRunSizeToLevel :: Run -> Int
levellingRunSizeToLevel :: Buffer -> Int
levellingRunSizeToLevel Buffer
r =
    Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Buffer -> Int
tieringRunSizeToLevel Buffer
r Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)  -- level numbers start at 1

maxBufferSize :: Int
maxBufferSize :: Int
maxBufferSize = Int -> Int
tieringRunSize Int
1 -- 4

-- | We use levelling on the last level, unless that is also the first level.
mergePolicyForLevel :: Int -> [Level s] -> UnionLevel s -> MergePolicy
mergePolicyForLevel :: forall s. Int -> [Level s] -> UnionLevel s -> MergePolicy
mergePolicyForLevel Int
1 [Level s]
_  UnionLevel s
_       = MergePolicy
MergePolicyTiering
mergePolicyForLevel Int
_ [] UnionLevel s
NoUnion = MergePolicy
MergePolicyLevelling
mergePolicyForLevel Int
_ [Level s]
_  UnionLevel s
_       = MergePolicy
MergePolicyTiering

-- | If there are no further levels provided, this level is the last one.
-- However, if a 'Union' is present, it acts as another (last) level.
mergeTypeForLevel :: [Level s] -> UnionLevel s -> LevelMergeType
mergeTypeForLevel :: forall s. [Level s] -> UnionLevel s -> LevelMergeType
mergeTypeForLevel [] UnionLevel s
NoUnion = LevelMergeType
MergeLastLevel
mergeTypeForLevel [Level s]
_  UnionLevel s
_       = LevelMergeType
MergeMidLevel

-- | Note that the invariants rely on the fact that levelling is only used on
-- the last level.
--
invariant :: forall s. LSMContent s -> ST s ()
invariant :: forall s. LSMContent s -> ST s ()
invariant (LSMContent Buffer
_ Levels s
levels UnionLevel s
ul) = do
    Int -> Levels s -> ST s ()
levelsInvariant Int
1 Levels s
levels
    case UnionLevel s
ul of
      UnionLevel s
NoUnion      -> () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Union MergingTree s
tree STRef s Int
_ -> Invariant s () -> ST s ()
forall s a. HasCallStack => Invariant s a -> ST s a
expectInvariant (MergingTree s -> Invariant s ()
forall s. MergingTree s -> Invariant s ()
treeInvariant MergingTree s
tree)
  where
    levelsInvariant :: Int -> Levels s -> ST s ()
    levelsInvariant :: Int -> Levels s -> ST s ()
levelsInvariant !Int
_ [] = () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    levelsInvariant !Int
ln (Level IncomingRun s
ir [Buffer]
rs : Levels s
ls) = do
      MergingRunState
mrs <- case IncomingRun s
ir of
        Single Buffer
r ->
          MergingRunState -> ST s MergingRunState
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer -> MergingRunState
CompletedMerge Buffer
r)
        Merging MergePolicy
mp NominalDebt
_ STRef s NominalCredit
_ (MergingRun LevelMergeType
mt MergeDebt
_ STRef s MergingRunState
ref) -> do
          Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Int
ln Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1  -- no merges on level 1
          Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ MergePolicy
mp MergePolicy -> MergePolicy -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> Levels s -> UnionLevel s -> MergePolicy
forall s. Int -> [Level s] -> UnionLevel s -> MergePolicy
mergePolicyForLevel Int
ln Levels s
ls UnionLevel s
ul
          Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ LevelMergeType
mt LevelMergeType -> LevelMergeType -> Bool
forall a. Eq a => a -> a -> Bool
== Levels s -> UnionLevel s -> LevelMergeType
forall s. [Level s] -> UnionLevel s -> LevelMergeType
mergeTypeForLevel Levels s
ls UnionLevel s
ul
          STRef s MergingRunState -> ST s MergingRunState
forall s a. STRef s a -> ST s a
readSTRef STRef s MergingRunState
ref

      Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ [Buffer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Buffer]
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
3
      Int -> [Buffer] -> Levels s -> ST s ()
expectedRunLengths Int
ln [Buffer]
rs Levels s
ls
      Int -> IncomingRun s -> MergingRunState -> Levels s -> ST s ()
expectedMergingRunLengths Int
ln IncomingRun s
ir MergingRunState
mrs Levels s
ls

      Int -> Levels s -> ST s ()
levelsInvariant (Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) Levels s
ls

    -- All runs within a level "proper" (as opposed to the incoming runs
    -- being merged) should be of the correct size for the level.
    expectedRunLengths :: Int -> [Run] -> [Level s] -> ST s ()
    expectedRunLengths :: Int -> [Buffer] -> Levels s -> ST s ()
expectedRunLengths Int
ln [Buffer]
rs Levels s
ls =
      case Int -> Levels s -> UnionLevel s -> MergePolicy
forall s. Int -> [Level s] -> UnionLevel s -> MergePolicy
mergePolicyForLevel Int
ln Levels s
ls UnionLevel s
ul of
        -- Levels using levelling have only one (incoming) run, which almost
        -- always consists of an ongoing merge. The exception is when a
        -- levelling run becomes too large and is promoted, in that case
        -- initially there's no merge, but it is still represented as an
        -- 'IncomingRun', using 'Single'. Thus there are no other resident runs.
        MergePolicy
MergePolicyLevelling -> Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ [Buffer] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Buffer]
rs
        -- Runs in tiering levels usually fit that size, but they can be one
        -- larger, if a run has been held back (creating a 5-way merge).
        MergePolicy
MergePolicyTiering   -> Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ (Buffer -> Bool) -> [Buffer] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\Buffer
r -> Buffer -> Int
tieringRunSizeToLevel Buffer
r Int -> [Int] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Int
ln, Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1]) [Buffer]
rs
        -- (This is actually still not really true, but will hold in practice.
        -- In the pathological case, all runs passed to the next level can be
        -- factor (5/4) too large, and there the same holding back can lead to
        -- factor (6/4) etc., until at level 12 a run is two levels too large.

    -- Incoming runs being merged also need to be of the right size, but the
    -- conditions are more complicated.
    expectedMergingRunLengths :: Int -> IncomingRun s -> MergingRunState
                              -> [Level s] -> ST s ()
    expectedMergingRunLengths :: Int -> IncomingRun s -> MergingRunState -> Levels s -> ST s ()
expectedMergingRunLengths Int
ln IncomingRun s
ir MergingRunState
mrs Levels s
ls =
      case Int -> Levels s -> UnionLevel s -> MergePolicy
forall s. Int -> [Level s] -> UnionLevel s -> MergePolicy
mergePolicyForLevel Int
ln Levels s
ls UnionLevel s
ul of
        MergePolicy
MergePolicyLevelling -> do
          case (IncomingRun s
ir, MergingRunState
mrs) of
            -- A single incoming run (which thus didn't need merging) must be
            -- of the expected size range already
            (Single Buffer
r, MergingRunState
m) -> do
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ case MergingRunState
m of CompletedMerge{} -> Bool
True
                                   OngoingMerge{}   -> Bool
False
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Buffer -> Int
levellingRunSizeToLevel Buffer
r Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
ln

            -- A completed merge for levelling can be of almost any size at all!
            -- It can be smaller, due to deletions in the last level. But it
            -- can't be bigger than would fit into the next level.
            (IncomingRun s
_, CompletedMerge Buffer
r) ->
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Buffer -> Int
levellingRunSizeToLevel Buffer
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1

            -- An ongoing merge for levelling should have 4 incoming runs of
            -- the right size for the level below (or slightly larger due to
            -- holding back underfull runs), and 1 run from this level,
            -- but the run from this level can be of almost any size for the
            -- same reasons as above. Although if this is the first merge for
            -- a new level, it'll have only 4 runs.
            (IncomingRun s
_, OngoingMerge MergeCredit
_ [Buffer]
rs Buffer
_) -> do
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ [Buffer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Buffer]
rs Int -> [Int] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Int
4, Int
5]
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ (Buffer -> Bool) -> [Buffer] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\Buffer
r -> Buffer -> Int
runSize Buffer
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) [Buffer]
rs  -- don't merge empty runs
              let incoming :: [Buffer]
incoming = Int -> [Buffer] -> [Buffer]
forall a. Int -> [a] -> [a]
take Int
4 [Buffer]
rs
              let resident :: [Buffer]
resident = Int -> [Buffer] -> [Buffer]
forall a. Int -> [a] -> [a]
drop Int
4 [Buffer]
rs
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ (Buffer -> Bool) -> [Buffer] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\Buffer
r -> Buffer -> Int
tieringRunSizeToLevel Buffer
r Int -> [Int] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1, Int
ln]) [Buffer]
incoming
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ (Buffer -> Bool) -> [Buffer] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\Buffer
r -> Buffer -> Int
levellingRunSizeToLevel Buffer
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) [Buffer]
resident

        MergePolicy
MergePolicyTiering ->
          case (IncomingRun s
ir, MergingRunState
mrs, Levels s -> UnionLevel s -> LevelMergeType
forall s. [Level s] -> UnionLevel s -> LevelMergeType
mergeTypeForLevel Levels s
ls UnionLevel s
ul) of
            -- A single incoming run (which thus didn't need merging) must be
            -- of the expected size already
            (Single Buffer
r, MergingRunState
m, LevelMergeType
_) -> do
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ case MergingRunState
m of CompletedMerge{} -> Bool
True
                                   OngoingMerge{}   -> Bool
False
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Buffer -> Int
tieringRunSizeToLevel Buffer
r Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
ln

            -- A completed last level run can be of almost any smaller size due
            -- to deletions, but it can't be bigger than the next level down.
            -- Note that tiering on the last level only occurs when there is
            -- a single level only.
            (IncomingRun s
_, CompletedMerge Buffer
r, LevelMergeType
MergeLastLevel) -> do
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Int
ln Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Buffer -> Int
tieringRunSizeToLevel Buffer
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1

            -- A completed mid level run is usually of the size for the
            -- level it is entering, but can also be one smaller (in which case
            -- it'll be held back and merged again) or one larger (because it
            -- includes a run that has been held back before).
            (IncomingRun s
_, CompletedMerge Buffer
r, LevelMergeType
MergeMidLevel) ->
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Buffer -> Int
tieringRunSizeToLevel Buffer
r Int -> [Int] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1, Int
ln, Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1]

            -- An ongoing merge for tiering should have 4 incoming runs of
            -- the right size for the level below, and at most 1 run held back
            -- due to being too small (which would thus also be of the size of
            -- the level below).
            (IncomingRun s
_, OngoingMerge MergeCredit
_ [Buffer]
rs Buffer
_, LevelMergeType
_) -> do
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ [Buffer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Buffer]
rs Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
4 Bool -> Bool -> Bool
|| [Buffer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Buffer]
rs Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
5
              Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ (Buffer -> Bool) -> [Buffer] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\Buffer
r -> Buffer -> Int
tieringRunSizeToLevel Buffer
r Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1) [Buffer]
rs

-- We don't make many assumptions apart from what the types already enforce.
-- In particular, there are no invariants on the progress of the merges,
-- since union merge credits are independent from the tables' regular level
-- merges.
treeInvariant :: MergingTree s -> Invariant s ()
treeInvariant :: forall s. MergingTree s -> Invariant s ()
treeInvariant tree :: MergingTree s
tree@(MergingTree STRef s (MergingTreeState s)
treeState) = do
    ST s (MergingTreeState s) -> Invariant s (MergingTreeState s)
forall s a. ST s a -> Invariant s a
liftI (STRef s (MergingTreeState s) -> ST s (MergingTreeState s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MergingTreeState s)
treeState) Invariant s (MergingTreeState s)
-> (MergingTreeState s -> Invariant s ()) -> Invariant s ()
forall a b.
ExceptT String (ST s) a
-> (a -> ExceptT String (ST s) b) -> ExceptT String (ST s) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      CompletedTreeMerge Buffer
_ ->
        -- We don't require the completed merges to be non-empty, since even
        -- a (last-level) merge of non-empty runs can end up being empty.
        -- In the prototype it would be possible to ensure that empty runs are
        -- immediately trimmed from the tree, but this kind of normalisation
        -- is complicated with sharing. For example, merging runs and
        -- trees are shared, so if one of them completes as an empty run,
        -- all tables referencing it suddenly contain an empty run and would
        -- need to be updated immediately.
        () -> Invariant s ()
forall a. a -> ExceptT String (ST s) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

      OngoingTreeMerge MergingRun TreeMergeType s
mr ->
        MergingRun TreeMergeType s -> Invariant s ()
forall t s. MergingRun t s -> Invariant s ()
mergeInvariant MergingRun TreeMergeType s
mr

      PendingTreeMerge (PendingLevelMerge [PreExistingRun s]
prs Maybe (MergingTree s)
t) -> do
        -- Non-empty, but can be just one input (see 'newPendingLevelMerge').
        -- Note that children of a pending merge can be empty runs, as noted
        -- above for 'CompletedTreeMerge'.
        String -> Bool -> Invariant s ()
forall s. String -> Bool -> Invariant s ()
assertI String
"pending level merges have at least one input" (Bool -> Invariant s ()) -> Bool -> Invariant s ()
forall a b. (a -> b) -> a -> b
$
          [PreExistingRun s] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [PreExistingRun s]
prs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Maybe (MergingTree s) -> Int
forall a. Maybe a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Maybe (MergingTree s)
t Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
        [PreExistingRun s]
-> (PreExistingRun s -> Invariant s ()) -> Invariant s ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [PreExistingRun s]
prs ((PreExistingRun s -> Invariant s ()) -> Invariant s ())
-> (PreExistingRun s -> Invariant s ()) -> Invariant s ()
forall a b. (a -> b) -> a -> b
$ \case
          PreExistingRun        Buffer
_r -> () -> Invariant s ()
forall a. a -> ExceptT String (ST s) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          PreExistingMergingRun MergingRun LevelMergeType s
mr -> MergingRun LevelMergeType s -> Invariant s ()
forall t s. MergingRun t s -> Invariant s ()
mergeInvariant MergingRun LevelMergeType s
mr
        Maybe (MergingTree s)
-> (MergingTree s -> Invariant s ()) -> Invariant s ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (MergingTree s)
t MergingTree s -> Invariant s ()
forall s. MergingTree s -> Invariant s ()
treeInvariant

      PendingTreeMerge (PendingUnionMerge [MergingTree s]
ts) -> do
        String -> Bool -> Invariant s ()
forall s. String -> Bool -> Invariant s ()
assertI String
"pending union merges are non-trivial (at least two inputs)" (Bool -> Invariant s ()) -> Bool -> Invariant s ()
forall a b. (a -> b) -> a -> b
$
          [MergingTree s] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [MergingTree s]
ts Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
        [MergingTree s]
-> (MergingTree s -> Invariant s ()) -> Invariant s ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [MergingTree s]
ts MergingTree s -> Invariant s ()
forall s. MergingTree s -> Invariant s ()
treeInvariant

    (Int
debt, Int
_) <- ST s (Int, Int) -> Invariant s (Int, Int)
forall s a. ST s a -> Invariant s a
liftI (ST s (Int, Int) -> Invariant s (Int, Int))
-> ST s (Int, Int) -> Invariant s (Int, Int)
forall a b. (a -> b) -> a -> b
$ MergingTree s -> ST s (Int, Int)
forall s. MergingTree s -> ST s (Int, Int)
remainingDebtMergingTree MergingTree s
tree
    Bool -> Invariant s () -> Invariant s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
debt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (Invariant s () -> Invariant s ())
-> Invariant s () -> Invariant s ()
forall a b. (a -> b) -> a -> b
$ do
      Buffer
_ <- MergingTree s -> Invariant s Buffer
forall s. MergingTree s -> Invariant s Buffer
isCompletedMergingTree MergingTree s
tree
      () -> Invariant s ()
forall a. a -> ExceptT String (ST s) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

mergeInvariant :: MergingRun t s -> Invariant s ()
mergeInvariant :: forall t s. MergingRun t s -> Invariant s ()
mergeInvariant (MergingRun t
_ MergeDebt
mergeDebt STRef s MergingRunState
ref) =
    ST s MergingRunState -> Invariant s MergingRunState
forall s a. ST s a -> Invariant s a
liftI (STRef s MergingRunState -> ST s MergingRunState
forall s a. STRef s a -> ST s a
readSTRef STRef s MergingRunState
ref) Invariant s MergingRunState
-> (MergingRunState -> ExceptT String (ST s) ())
-> ExceptT String (ST s) ()
forall a b.
ExceptT String (ST s) a
-> (a -> ExceptT String (ST s) b) -> ExceptT String (ST s) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      CompletedMerge Buffer
_ -> () -> ExceptT String (ST s) ()
forall a. a -> ExceptT String (ST s) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      OngoingMerge MergeCredit
mergeCredit [Buffer]
rs Buffer
_ -> do
        String -> Bool -> ExceptT String (ST s) ()
forall s. String -> Bool -> Invariant s ()
assertI String
"merge debt & credit invariant" (Bool -> ExceptT String (ST s) ())
-> Bool -> ExceptT String (ST s) ()
forall a b. (a -> b) -> a -> b
$
          MergeDebt -> MergeCredit -> Bool
mergeDebtInvariant MergeDebt
mergeDebt MergeCredit
mergeCredit
        String -> Bool -> ExceptT String (ST s) ()
forall s. String -> Bool -> Invariant s ()
assertI String
"inputs to ongoing merges aren't empty" (Bool -> ExceptT String (ST s) ())
-> Bool -> ExceptT String (ST s) ()
forall a b. (a -> b) -> a -> b
$
          (Buffer -> Bool) -> [Buffer] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\Buffer
r -> Buffer -> Int
runSize Buffer
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) [Buffer]
rs
        String -> Bool -> ExceptT String (ST s) ()
forall s. String -> Bool -> Invariant s ()
assertI String
"ongoing merges are non-trivial (at least two inputs)" (Bool -> ExceptT String (ST s) ())
-> Bool -> ExceptT String (ST s) ()
forall a b. (a -> b) -> a -> b
$
          [Buffer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Buffer]
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1

isCompletedMergingRun :: MergingRun t s -> Invariant s Run
isCompletedMergingRun :: forall t s. MergingRun t s -> Invariant s Buffer
isCompletedMergingRun (MergingRun t
_ MergeDebt
d STRef s MergingRunState
ref) = do
    MergingRunState
mrs <- ST s MergingRunState -> Invariant s MergingRunState
forall s a. ST s a -> Invariant s a
liftI (ST s MergingRunState -> Invariant s MergingRunState)
-> ST s MergingRunState -> Invariant s MergingRunState
forall a b. (a -> b) -> a -> b
$ STRef s MergingRunState -> ST s MergingRunState
forall s a. STRef s a -> ST s a
readSTRef STRef s MergingRunState
ref
    case MergingRunState
mrs of
      CompletedMerge Buffer
r   -> Buffer -> Invariant s Buffer
forall a. a -> ExceptT String (ST s) a
forall (m :: * -> *) a. Monad m => a -> m a
return Buffer
r
      OngoingMerge MergeCredit
c [Buffer]
_ Buffer
_ -> String -> Invariant s Buffer
forall s a. String -> Invariant s a
failI (String -> Invariant s Buffer) -> String -> Invariant s Buffer
forall a b. (a -> b) -> a -> b
$ String
"not completed: OngoingMerge with"
                                 String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" remaining debt "
                                 String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show (HasCallStack => MergeDebt -> MergeCredit -> Int
MergeDebt -> MergeCredit -> Int
mergeDebtLeft MergeDebt
d MergeCredit
c)

isCompletedMergingTree :: MergingTree s -> Invariant s Run
isCompletedMergingTree :: forall s. MergingTree s -> Invariant s Buffer
isCompletedMergingTree (MergingTree STRef s (MergingTreeState s)
ref) = do
    MergingTreeState s
mts <- ST s (MergingTreeState s) -> Invariant s (MergingTreeState s)
forall s a. ST s a -> Invariant s a
liftI (ST s (MergingTreeState s) -> Invariant s (MergingTreeState s))
-> ST s (MergingTreeState s) -> Invariant s (MergingTreeState s)
forall a b. (a -> b) -> a -> b
$ STRef s (MergingTreeState s) -> ST s (MergingTreeState s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MergingTreeState s)
ref
    case MergingTreeState s
mts of
      CompletedTreeMerge Buffer
r -> Buffer -> Invariant s Buffer
forall a. a -> ExceptT String (ST s) a
forall (m :: * -> *) a. Monad m => a -> m a
return Buffer
r
      OngoingTreeMerge MergingRun TreeMergeType s
mr  -> MergingRun TreeMergeType s -> Invariant s Buffer
forall t s. MergingRun t s -> Invariant s Buffer
isCompletedMergingRun MergingRun TreeMergeType s
mr
      PendingTreeMerge PendingMerge s
_   -> String -> Invariant s Buffer
forall s a. String -> Invariant s a
failI (String -> Invariant s Buffer) -> String -> Invariant s Buffer
forall a b. (a -> b) -> a -> b
$ String
"not completed: PendingTreeMerge"

type Invariant s = E.ExceptT String (ST s)

assertI :: String -> Bool -> Invariant s ()
assertI :: forall s. String -> Bool -> Invariant s ()
assertI String
_ Bool
True  = () -> ExceptT String (ST s) ()
forall a. a -> ExceptT String (ST s) a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
assertI String
e Bool
False = String -> ExceptT String (ST s) ()
forall s a. String -> Invariant s a
failI String
e

failI :: String -> Invariant s a
failI :: forall s a. String -> Invariant s a
failI = String -> ExceptT String (ST s) a
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
E.throwE

liftI :: ST s a -> Invariant s a
liftI :: forall s a. ST s a -> Invariant s a
liftI = ST s (Either String a) -> ExceptT String (ST s) a
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
E.ExceptT (ST s (Either String a) -> ExceptT String (ST s) a)
-> (ST s a -> ST s (Either String a))
-> ST s a
-> ExceptT String (ST s) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Either String a) -> ST s a -> ST s (Either String a)
forall a b. (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Either String a
forall a b. b -> Either a b
Right

expectInvariant :: HasCallStack => Invariant s a -> ST s a
expectInvariant :: forall s a. HasCallStack => Invariant s a -> ST s a
expectInvariant Invariant s a
act = Invariant s a -> ST s (Either String a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
E.runExceptT Invariant s a
act ST s (Either String a) -> (Either String a -> ST s a) -> ST s a
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (String -> ST s a) -> (a -> ST s a) -> Either String a -> ST s a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either String -> ST s a
forall a. HasCallStack => String -> a
error a -> ST s a
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return

evalInvariant :: Invariant s a -> ST s (Either String a)
evalInvariant :: forall s a. Invariant s a -> ST s (Either String a)
evalInvariant = ExceptT String (ST s) a -> ST s (Either String a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
E.runExceptT

-- 'callStack' just ensures that the 'HasCallStack' constraint is not redundant
-- when compiling with debug assertions disabled.
assert :: HasCallStack => Bool -> a -> a
assert :: forall a. HasCallStack => Bool -> a -> a
assert Bool
p a
x = Bool -> a -> a
forall a. HasCallStack => Bool -> a -> a
Exc.assert Bool
p (a -> CallStack -> a
forall a b. a -> b -> a
const a
x CallStack
HasCallStack => CallStack
callStack)

assertST :: HasCallStack => Bool -> ST s ()
assertST :: forall s. HasCallStack => Bool -> ST s ()
assertST Bool
p = Bool -> ST s () -> ST s ()
forall a. HasCallStack => Bool -> a -> a
assert Bool
p (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-------------------------------------------------------------------------------
-- Merging credits
--

-- | Credits for keeping track of merge progress. These credits correspond
-- directly to merge steps performed.
--
-- We also call these \"physical\" credits (since they correspond to steps
-- done), and as opposed to \"nominal\" credits in 'NominalCredit' and
-- 'NominalDebt'.
type Credit = Int

-- | Debt for keeping track of the total merge work to do.
type Debt = Int

data MergeCredit =
     MergeCredit {
       MergeCredit -> Int
spentCredits   :: !Credit, -- accumulating
       MergeCredit -> Int
unspentCredits :: !Credit  -- fluctuating
     }
  deriving stock Int -> MergeCredit -> ShowS
[MergeCredit] -> ShowS
MergeCredit -> String
(Int -> MergeCredit -> ShowS)
-> (MergeCredit -> String)
-> ([MergeCredit] -> ShowS)
-> Show MergeCredit
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MergeCredit -> ShowS
showsPrec :: Int -> MergeCredit -> ShowS
$cshow :: MergeCredit -> String
show :: MergeCredit -> String
$cshowList :: [MergeCredit] -> ShowS
showList :: [MergeCredit] -> ShowS
Show

newtype MergeDebt =
        MergeDebt {
          MergeDebt -> Int
totalDebt :: Debt  -- fixed
        }
  deriving stock Int -> MergeDebt -> ShowS
[MergeDebt] -> ShowS
MergeDebt -> String
(Int -> MergeDebt -> ShowS)
-> (MergeDebt -> String)
-> ([MergeDebt] -> ShowS)
-> Show MergeDebt
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MergeDebt -> ShowS
showsPrec :: Int -> MergeDebt -> ShowS
$cshow :: MergeDebt -> String
show :: MergeDebt -> String
$cshowList :: [MergeDebt] -> ShowS
showList :: [MergeDebt] -> ShowS
Show

zeroMergeCredit :: MergeCredit
zeroMergeCredit :: MergeCredit
zeroMergeCredit =
    MergeCredit {
      spentCredits :: Int
spentCredits   = Int
0,
      unspentCredits :: Int
unspentCredits = Int
0
    }

mergeDebtInvariant :: MergeDebt -> MergeCredit -> Bool
mergeDebtInvariant :: MergeDebt -> MergeCredit -> Bool
mergeDebtInvariant MergeDebt {Int
totalDebt :: MergeDebt -> Int
totalDebt :: Int
totalDebt}
                   MergeCredit {Int
spentCredits :: MergeCredit -> Int
spentCredits :: Int
spentCredits, Int
unspentCredits :: MergeCredit -> Int
unspentCredits :: Int
unspentCredits} =
    let suppliedCredits :: Int
suppliedCredits = Int
spentCredits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
unspentCredits
     in Int
spentCredits    Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0
     -- unspentCredits could legitimately be negative, though that does not
     -- happen in this prototype
     Bool -> Bool -> Bool
&& Int
suppliedCredits Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0
     Bool -> Bool -> Bool
&& Int
suppliedCredits Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
totalDebt

mergeDebtLeft :: HasCallStack => MergeDebt -> MergeCredit -> Debt
mergeDebtLeft :: HasCallStack => MergeDebt -> MergeCredit -> Int
mergeDebtLeft MergeDebt {Int
totalDebt :: MergeDebt -> Int
totalDebt :: Int
totalDebt}
              MergeCredit {Int
spentCredits :: MergeCredit -> Int
spentCredits :: Int
spentCredits, Int
unspentCredits :: MergeCredit -> Int
unspentCredits :: Int
unspentCredits} =
    let suppliedCredits :: Int
suppliedCredits = Int
spentCredits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
unspentCredits
     in Bool -> Int -> Int
forall a. HasCallStack => Bool -> a -> a
assert (Int
suppliedCredits Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
totalDebt)
               (Int
totalDebt Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
suppliedCredits)

-- | As credits are paid, debt is reduced in batches when sufficient credits
-- have accumulated.
data MergeDebtPaydown =
    -- | This remaining merge debt is fully paid off, potentially with
    -- leftovers.
    MergeDebtDischarged !Debt !Credit

    -- | Credits were paid, but not enough for merge debt to be reduced by some
    -- batches of merging work.
  | MergeDebtPaydownCredited !MergeCredit

    -- | Enough credits were paid to reduce merge debt by performing some
    -- batches of merging work.
  | MergeDebtPaydownPerform !Debt !MergeCredit
  deriving stock Int -> MergeDebtPaydown -> ShowS
[MergeDebtPaydown] -> ShowS
MergeDebtPaydown -> String
(Int -> MergeDebtPaydown -> ShowS)
-> (MergeDebtPaydown -> String)
-> ([MergeDebtPaydown] -> ShowS)
-> Show MergeDebtPaydown
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MergeDebtPaydown -> ShowS
showsPrec :: Int -> MergeDebtPaydown -> ShowS
$cshow :: MergeDebtPaydown -> String
show :: MergeDebtPaydown -> String
$cshowList :: [MergeDebtPaydown] -> ShowS
showList :: [MergeDebtPaydown] -> ShowS
Show

-- | Pay credits to merge debt, which might trigger performing some merge work
-- in batches. See 'MergeDebtPaydown'.
--
paydownMergeDebt :: MergeDebt -> MergeCredit -> Credit -> MergeDebtPaydown
paydownMergeDebt :: MergeDebt -> MergeCredit -> Int -> MergeDebtPaydown
paydownMergeDebt MergeDebt {Int
totalDebt :: MergeDebt -> Int
totalDebt :: Int
totalDebt}
                 MergeCredit {Int
spentCredits :: MergeCredit -> Int
spentCredits :: Int
spentCredits, Int
unspentCredits :: MergeCredit -> Int
unspentCredits :: Int
unspentCredits}
                 Int
c
  | Int
suppliedCredits' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
totalDebt
  , let !leftover :: Int
leftover = Int
suppliedCredits' Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
totalDebt
        !perform :: Int
perform  = Int
c Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
leftover
  = Bool -> MergeDebtPaydown -> MergeDebtPaydown
forall a. HasCallStack => Bool -> a -> a
assert (Int -> Int -> Bool
dischargePostcondition Int
perform Int
leftover) (MergeDebtPaydown -> MergeDebtPaydown)
-> MergeDebtPaydown -> MergeDebtPaydown
forall a b. (a -> b) -> a -> b
$
    Int -> Int -> MergeDebtPaydown
MergeDebtDischarged Int
perform Int
leftover

  | Int
unspentCredits' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
mergeBatchSize
  , let (!Int
b, !Int
r)         = Int -> Int -> (Int, Int)
forall a. Integral a => a -> a -> (a, a)
divMod Int
unspentCredits' Int
mergeBatchSize
        !perform :: Int
perform         = Int
b Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
mergeBatchSize
  = Bool -> MergeDebtPaydown -> MergeDebtPaydown
forall a. HasCallStack => Bool -> a -> a
assert (Int -> Int -> Bool
performPostcondition Int
perform Int
r) (MergeDebtPaydown -> MergeDebtPaydown)
-> MergeDebtPaydown -> MergeDebtPaydown
forall a b. (a -> b) -> a -> b
$
    Int -> MergeCredit -> MergeDebtPaydown
MergeDebtPaydownPerform
      Int
perform
      MergeCredit {
        spentCredits :: Int
spentCredits   = Int
spentCredits    Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
perform,
        unspentCredits :: Int
unspentCredits = Int
unspentCredits' Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
perform
      }

  | Bool
otherwise
  = Bool -> MergeDebtPaydown -> MergeDebtPaydown
forall a. HasCallStack => Bool -> a -> a
assert Bool
creditedPostcondition (MergeDebtPaydown -> MergeDebtPaydown)
-> MergeDebtPaydown -> MergeDebtPaydown
forall a b. (a -> b) -> a -> b
$
    MergeCredit -> MergeDebtPaydown
MergeDebtPaydownCredited
      MergeCredit {
        Int
spentCredits :: Int
spentCredits :: Int
spentCredits,
        unspentCredits :: Int
unspentCredits = Int
unspentCredits'
      }
  where
    suppliedCredits' :: Int
suppliedCredits' = Int
spentCredits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
unspentCredits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
c
    unspentCredits' :: Int
unspentCredits'  =                Int
unspentCredits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
c

    dischargePostcondition :: Int -> Int -> Bool
dischargePostcondition Int
perform Int
leftover =
          (Int
c Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0)
       Bool -> Bool -> Bool
&& (Int
perform Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
leftover Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0)
       Bool -> Bool -> Bool
&& (Int
c Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
perform Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
leftover)
       Bool -> Bool -> Bool
&& (Int
spentCredits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
unspentCredits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
perform Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
totalDebt)

    performPostcondition :: Int -> Int -> Bool
performPostcondition Int
perform Int
r =
      let spentCredits' :: Int
spentCredits'    = Int
spentCredits    Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
perform
          unspentCredits'' :: Int
unspentCredits'' = Int
unspentCredits' Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
perform
       in (Int
c Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0)
       Bool -> Bool -> Bool
&& (Int
unspentCredits'' Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
r)
       Bool -> Bool -> Bool
&& (Int
suppliedCredits' Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
spentCredits' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
unspentCredits'')
       Bool -> Bool -> Bool
&& (Int
suppliedCredits' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
totalDebt)

    creditedPostcondition :: Bool
creditedPostcondition =
          (Int
c Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0)
       Bool -> Bool -> Bool
&& (Int
suppliedCredits' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
totalDebt)

mergeBatchSize :: Int
mergeBatchSize :: Int
mergeBatchSize = Int
32


-------------------------------------------------------------------------------
-- Merging run abstraction
--

newMergingRun :: IsMergeType t => t -> [Run] -> ST s (MergingRun t s)
newMergingRun :: forall t s. IsMergeType t => t -> [Buffer] -> ST s (MergingRun t s)
newMergingRun t
mergeType [Buffer]
runs = do
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ [Buffer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Buffer]
runs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
    -- in some cases, no merging is required at all
    (Int
debt, MergingRunState
state) <- case (Buffer -> Bool) -> [Buffer] -> [Buffer]
forall a. (a -> Bool) -> [a] -> [a]
filter (\Buffer
r -> Buffer -> Int
runSize Buffer
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) [Buffer]
runs of
      []  -> let (Buffer
r:[Buffer]
_) = [Buffer]
runs -- just re-use the empty input
              in (Int, MergingRunState) -> ST s (Int, MergingRunState)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer -> Int
runSize Buffer
r, Buffer -> MergingRunState
CompletedMerge Buffer
r)
      [Buffer
r] -> (Int, MergingRunState) -> ST s (Int, MergingRunState)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer -> Int
runSize Buffer
r, Buffer -> MergingRunState
CompletedMerge Buffer
r)
      [Buffer]
rs  -> do
        -- The (physical) debt is always exactly the cost (merge steps),
        -- which is the sum of run lengths in elements.
        let !debt :: Int
debt  = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ((Buffer -> Int) -> [Buffer] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Buffer -> Int
runSize [Buffer]
rs)
        let merged :: Buffer
merged = t -> [Buffer] -> Buffer
forall t. IsMergeType t => t -> [Buffer] -> Buffer
mergek t
mergeType [Buffer]
rs  -- deliberately lazy
        (Int, MergingRunState) -> ST s (Int, MergingRunState)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
debt, MergeCredit -> [Buffer] -> Buffer -> MergingRunState
OngoingMerge MergeCredit
zeroMergeCredit [Buffer]
rs Buffer
merged)
    t -> MergeDebt -> STRef s MergingRunState -> MergingRun t s
forall t s.
t -> MergeDebt -> STRef s MergingRunState -> MergingRun t s
MergingRun t
mergeType (Int -> MergeDebt
MergeDebt Int
debt) (STRef s MergingRunState -> MergingRun t s)
-> ST s (STRef s MergingRunState) -> ST s (MergingRun t s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MergingRunState -> ST s (STRef s MergingRunState)
forall a s. a -> ST s (STRef s a)
newSTRef MergingRunState
state

mergek :: IsMergeType t => t -> [Run] -> Run
mergek :: forall t. IsMergeType t => t -> [Buffer] -> Buffer
mergek t
t =
      (if t -> Bool
forall t. IsMergeType t => t -> Bool
isLastLevel t
t then (Op -> Bool) -> Buffer -> Buffer
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter (Op -> Op -> Bool
forall a. Eq a => a -> a -> Bool
/= Op
forall v b. Update v b
Delete) else Buffer -> Buffer
forall a. a -> a
id)
    (Buffer -> Buffer) -> ([Buffer] -> Buffer) -> [Buffer] -> Buffer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Op -> Op -> Op) -> [Buffer] -> Buffer
forall (f :: * -> *) k a.
(Foldable f, Ord k) =>
(a -> a -> a) -> f (Map k a) -> Map k a
Map.unionsWith (if t -> Bool
forall t. IsMergeType t => t -> Bool
isUnion t
t then Op -> Op -> Op
combineUnion else Op -> Op -> Op
combine)

-- | Combines two entries that have been performed after another. Therefore, the
-- newer one overwrites the old one (or modifies it for 'Mupsert'). Only take a
-- blob from the left entry.
combine :: Op -> Op -> Op
combine :: Op -> Op -> Op
combine Op
new_ Op
old = case Op
new_ of
    Insert{}  -> Op
new_
    Delete{}  -> Op
new_
    Mupsert Value
v -> case Op
old of
      Insert Value
v' Maybe Blob
_ -> Value -> Maybe Blob -> Op
forall v b. v -> Maybe b -> Update v b
Insert (Value -> Value -> Value
resolveValue Value
v Value
v') Maybe Blob
forall a. Maybe a
Nothing
      Op
Delete      -> Value -> Maybe Blob -> Op
forall v b. v -> Maybe b -> Update v b
Insert Value
v Maybe Blob
forall a. Maybe a
Nothing
      Mupsert Value
v'  -> Value -> Op
forall v b. v -> Update v b
Mupsert (Value -> Value -> Value
resolveValue Value
v Value
v')

-- | Combines two entries of runs that have been 'union'ed together. If any one
-- has a value, the result should have a value (represented by 'Insert'). If
-- both have a value, these values get combined monoidally. Only take a blob
-- from the left entry.
--
-- See 'MergeUnion'.
combineUnion :: Op -> Op -> Op
combineUnion :: Op -> Op -> Op
combineUnion Op
Delete         (Mupsert Value
v)  = Value -> Maybe Blob -> Op
forall v b. v -> Maybe b -> Update v b
Insert Value
v Maybe Blob
forall a. Maybe a
Nothing
combineUnion Op
Delete         Op
old          = Op
old
combineUnion (Mupsert Value
u)    Op
Delete       = Value -> Maybe Blob -> Op
forall v b. v -> Maybe b -> Update v b
Insert Value
u Maybe Blob
forall a. Maybe a
Nothing
combineUnion Op
new_           Op
Delete       = Op
new_
combineUnion (Mupsert Value
v')   (Mupsert Value
v ) = Value -> Maybe Blob -> Op
forall v b. v -> Maybe b -> Update v b
Insert (Value -> Value -> Value
resolveValue Value
v' Value
v) Maybe Blob
forall a. Maybe a
Nothing
combineUnion (Mupsert Value
v')   (Insert Value
v Maybe Blob
_) = Value -> Maybe Blob -> Op
forall v b. v -> Maybe b -> Update v b
Insert (Value -> Value -> Value
resolveValue Value
v' Value
v) Maybe Blob
forall a. Maybe a
Nothing
combineUnion (Insert Value
v' Maybe Blob
b') (Mupsert Value
v)  = Value -> Maybe Blob -> Op
forall v b. v -> Maybe b -> Update v b
Insert (Value -> Value -> Value
resolveValue Value
v' Value
v) Maybe Blob
b'
combineUnion (Insert Value
v' Maybe Blob
b') (Insert Value
v Maybe Blob
_) = Value -> Maybe Blob -> Op
forall v b. v -> Maybe b -> Update v b
Insert (Value -> Value -> Value
resolveValue Value
v' Value
v) Maybe Blob
b'

expectCompletedMergingRun :: HasCallStack => MergingRun t s -> ST s Run
expectCompletedMergingRun :: forall t s. HasCallStack => MergingRun t s -> ST s Buffer
expectCompletedMergingRun = Invariant s Buffer -> ST s Buffer
forall s a. HasCallStack => Invariant s a -> ST s a
expectInvariant (Invariant s Buffer -> ST s Buffer)
-> (MergingRun t s -> Invariant s Buffer)
-> MergingRun t s
-> ST s Buffer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MergingRun t s -> Invariant s Buffer
forall t s. MergingRun t s -> Invariant s Buffer
isCompletedMergingRun

supplyCreditsMergingRun :: Credit -> MergingRun t s -> ST s Credit
supplyCreditsMergingRun :: forall t s. Int -> MergingRun t s -> ST s Int
supplyCreditsMergingRun =
    (MergingRun t s -> ST s (Int, Int))
-> (Int -> MergingRun t s -> ST s Int)
-> Int
-> MergingRun t s
-> ST s Int
forall a s.
HasCallStack =>
(a -> ST s (Int, Int))
-> (Int -> a -> ST s Int) -> Int -> a -> ST s Int
checked MergingRun t s -> ST s (Int, Int)
forall t s. MergingRun t s -> ST s (Int, Int)
remainingDebtMergingRun ((Int -> MergingRun t s -> ST s Int)
 -> Int -> MergingRun t s -> ST s Int)
-> (Int -> MergingRun t s -> ST s Int)
-> Int
-> MergingRun t s
-> ST s Int
forall a b. (a -> b) -> a -> b
$ \Int
credits (MergingRun t
_ MergeDebt
mergeDebt STRef s MergingRunState
ref) -> do
    MergingRunState
mrs <- STRef s MergingRunState -> ST s MergingRunState
forall s a. STRef s a -> ST s a
readSTRef STRef s MergingRunState
ref
    case MergingRunState
mrs of
      CompletedMerge{} -> Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
credits
      OngoingMerge MergeCredit
mergeCredit [Buffer]
rs Buffer
r ->
        case MergeDebt -> MergeCredit -> Int -> MergeDebtPaydown
paydownMergeDebt MergeDebt
mergeDebt MergeCredit
mergeCredit Int
credits of
          MergeDebtDischarged Int
_ Int
leftover -> do
            STRef s MergingRunState -> MergingRunState -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s MergingRunState
ref (Buffer -> MergingRunState
CompletedMerge Buffer
r)
            Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
leftover

          MergeDebtPaydownCredited MergeCredit
mergeCredit' -> do
            STRef s MergingRunState -> MergingRunState -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s MergingRunState
ref (MergeCredit -> [Buffer] -> Buffer -> MergingRunState
OngoingMerge MergeCredit
mergeCredit' [Buffer]
rs Buffer
r)
            Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0

          MergeDebtPaydownPerform Int
_mergeSteps MergeCredit
mergeCredit' -> do
            -- we're not doing any actual merging
            -- just tracking what we would do
            STRef s MergingRunState -> MergingRunState -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s MergingRunState
ref (MergeCredit -> [Buffer] -> Buffer -> MergingRunState
OngoingMerge MergeCredit
mergeCredit' [Buffer]
rs Buffer
r)
            Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0

suppliedCreditMergingRun :: MergingRun t s -> ST s Credit
suppliedCreditMergingRun :: forall t s. MergingRun t s -> ST s Int
suppliedCreditMergingRun (MergingRun t
_ MergeDebt
d STRef s MergingRunState
ref) =
    STRef s MergingRunState -> ST s MergingRunState
forall s a. STRef s a -> ST s a
readSTRef STRef s MergingRunState
ref ST s MergingRunState -> (MergingRunState -> ST s Int) -> ST s Int
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      CompletedMerge{} ->
        let MergeDebt { Int
totalDebt :: MergeDebt -> Int
totalDebt :: Int
totalDebt } = MergeDebt
d in
        Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
totalDebt
      OngoingMerge MergeCredit {Int
spentCredits :: MergeCredit -> Int
spentCredits :: Int
spentCredits, Int
unspentCredits :: MergeCredit -> Int
unspentCredits :: Int
unspentCredits} [Buffer]
_ Buffer
_ ->
        Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
spentCredits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
unspentCredits)

-------------------------------------------------------------------------------
-- LSM handle
--

new :: ST s (LSM s)
new :: forall s. ST s (LSM s)
new = do
  STRef s Int
c   <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef Int
0
  STRef s (LSMContent s)
lsm <- LSMContent s -> ST s (STRef s (LSMContent s))
forall a s. a -> ST s (STRef s a)
newSTRef (Buffer -> Levels s -> UnionLevel s -> LSMContent s
forall s. Buffer -> Levels s -> UnionLevel s -> LSMContent s
LSMContent Buffer
forall k a. Map k a
Map.empty [] UnionLevel s
forall s. UnionLevel s
NoUnion)
  LSM s -> ST s (LSM s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (STRef s Int -> STRef s (LSMContent s) -> LSM s
forall s. STRef s Int -> STRef s (LSMContent s) -> LSM s
LSMHandle STRef s Int
c STRef s (LSMContent s)
lsm)

inserts :: Tracer (ST s) Event -> LSM s -> [(Key, Value, Maybe Blob)] -> ST s ()
inserts :: forall s.
Tracer (ST s) Event
-> LSM s -> [(Key, Value, Maybe Blob)] -> ST s ()
inserts Tracer (ST s) Event
tr LSM s
lsm [(Key, Value, Maybe Blob)]
kvbs = Tracer (ST s) Event -> LSM s -> [(Key, Op)] -> ST s ()
forall s. Tracer (ST s) Event -> LSM s -> [(Key, Op)] -> ST s ()
updates Tracer (ST s) Event
tr LSM s
lsm [ (Key
k, Value -> Maybe Blob -> Op
forall v b. v -> Maybe b -> Update v b
Insert Value
v Maybe Blob
b) | (Key
k, Value
v, Maybe Blob
b) <- [(Key, Value, Maybe Blob)]
kvbs ]

insert :: Tracer (ST s) Event -> LSM s -> Key -> Value -> Maybe Blob -> ST s ()
insert :: forall s.
Tracer (ST s) Event
-> LSM s -> Key -> Value -> Maybe Blob -> ST s ()
insert Tracer (ST s) Event
tr LSM s
lsm Key
k Value
v Maybe Blob
b = Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
forall s. Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
update Tracer (ST s) Event
tr LSM s
lsm Key
k (Value -> Maybe Blob -> Op
forall v b. v -> Maybe b -> Update v b
Insert Value
v Maybe Blob
b)

deletes :: Tracer (ST s) Event -> LSM s -> [Key] ->  ST s ()
deletes :: forall s. Tracer (ST s) Event -> LSM s -> [Key] -> ST s ()
deletes Tracer (ST s) Event
tr LSM s
lsm [Key]
ks = Tracer (ST s) Event -> LSM s -> [(Key, Op)] -> ST s ()
forall s. Tracer (ST s) Event -> LSM s -> [(Key, Op)] -> ST s ()
updates Tracer (ST s) Event
tr LSM s
lsm [ (Key
k, Op
forall v b. Update v b
Delete) | Key
k <- [Key]
ks ]

delete :: Tracer (ST s) Event -> LSM s -> Key ->  ST s ()
delete :: forall s. Tracer (ST s) Event -> LSM s -> Key -> ST s ()
delete Tracer (ST s) Event
tr LSM s
lsm Key
k = Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
forall s. Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
update Tracer (ST s) Event
tr LSM s
lsm Key
k Op
forall v b. Update v b
Delete

mupserts :: Tracer (ST s) Event -> LSM s -> [(Key, Value)] -> ST s ()
mupserts :: forall s. Tracer (ST s) Event -> LSM s -> [(Key, Value)] -> ST s ()
mupserts Tracer (ST s) Event
tr LSM s
lsm [(Key, Value)]
kvbs = Tracer (ST s) Event -> LSM s -> [(Key, Op)] -> ST s ()
forall s. Tracer (ST s) Event -> LSM s -> [(Key, Op)] -> ST s ()
updates Tracer (ST s) Event
tr LSM s
lsm [ (Key
k, Value -> Op
forall v b. v -> Update v b
Mupsert Value
v) | (Key
k, Value
v) <- [(Key, Value)]
kvbs ]

mupsert :: Tracer (ST s) Event -> LSM s -> Key -> Value -> ST s ()
mupsert :: forall s. Tracer (ST s) Event -> LSM s -> Key -> Value -> ST s ()
mupsert Tracer (ST s) Event
tr LSM s
lsm Key
k Value
v = Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
forall s. Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
update Tracer (ST s) Event
tr LSM s
lsm Key
k (Value -> Op
forall v b. v -> Update v b
Mupsert Value
v)

data Update v b =
    Insert !v !(Maybe b)
  | Mupsert !v
  | Delete
  deriving stock (Update v b -> Update v b -> Bool
(Update v b -> Update v b -> Bool)
-> (Update v b -> Update v b -> Bool) -> Eq (Update v b)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall v b. (Eq b, Eq v) => Update v b -> Update v b -> Bool
$c== :: forall v b. (Eq b, Eq v) => Update v b -> Update v b -> Bool
== :: Update v b -> Update v b -> Bool
$c/= :: forall v b. (Eq b, Eq v) => Update v b -> Update v b -> Bool
/= :: Update v b -> Update v b -> Bool
Eq, Int -> Update v b -> ShowS
[Update v b] -> ShowS
Update v b -> String
(Int -> Update v b -> ShowS)
-> (Update v b -> String)
-> ([Update v b] -> ShowS)
-> Show (Update v b)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall v b. (Show b, Show v) => Int -> Update v b -> ShowS
forall v b. (Show b, Show v) => [Update v b] -> ShowS
forall v b. (Show b, Show v) => Update v b -> String
$cshowsPrec :: forall v b. (Show b, Show v) => Int -> Update v b -> ShowS
showsPrec :: Int -> Update v b -> ShowS
$cshow :: forall v b. (Show b, Show v) => Update v b -> String
show :: Update v b -> String
$cshowList :: forall v b. (Show b, Show v) => [Update v b] -> ShowS
showList :: [Update v b] -> ShowS
Show)

updates :: Tracer (ST s) Event -> LSM s -> [(Key, Op)] -> ST s ()
updates :: forall s. Tracer (ST s) Event -> LSM s -> [(Key, Op)] -> ST s ()
updates Tracer (ST s) Event
tr LSM s
lsm = ((Key, Op) -> ST s ()) -> [(Key, Op)] -> ST s ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((Key -> Op -> ST s ()) -> (Key, Op) -> ST s ()
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
forall s. Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
update Tracer (ST s) Event
tr LSM s
lsm))

update :: Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
update :: forall s. Tracer (ST s) Event -> LSM s -> Key -> Op -> ST s ()
update Tracer (ST s) Event
tr (LSMHandle STRef s Int
scr STRef s (LSMContent s)
lsmr) Key
k Op
op = do
    Int
sc <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
scr
    content :: LSMContent s
content@(LSMContent Buffer
wb Levels s
ls UnionLevel s
unionLevel) <- STRef s (LSMContent s) -> ST s (LSMContent s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (LSMContent s)
lsmr
    STRef s Int -> (Int -> Int) -> ST s ()
forall s a. STRef s a -> (a -> a) -> ST s ()
modifySTRef' STRef s Int
scr (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
    NominalCredit -> Levels s -> ST s ()
forall s. NominalCredit -> Levels s -> ST s ()
supplyCreditsLevels (Int -> NominalCredit
NominalCredit Int
1) Levels s
ls
    LSMContent s -> ST s ()
forall s. LSMContent s -> ST s ()
invariant LSMContent s
content
    let wb' :: Buffer
wb' = (Op -> Op -> Op) -> Key -> Op -> Buffer -> Buffer
forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
Map.insertWith Op -> Op -> Op
combine Key
k Op
op Buffer
wb
    if Buffer -> Int
bufferSize Buffer
wb' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxBufferSize
      then do
        Levels s
ls' <- Tracer (ST s) Event
-> Int -> Buffer -> Levels s -> UnionLevel s -> ST s (Levels s)
forall s.
Tracer (ST s) Event
-> Int -> Buffer -> Levels s -> UnionLevel s -> ST s (Levels s)
increment Tracer (ST s) Event
tr Int
sc (Buffer -> Buffer
bufferToRun Buffer
wb') Levels s
ls UnionLevel s
unionLevel
        let content' :: LSMContent s
content' = Buffer -> Levels s -> UnionLevel s -> LSMContent s
forall s. Buffer -> Levels s -> UnionLevel s -> LSMContent s
LSMContent Buffer
forall k a. Map k a
Map.empty Levels s
ls' UnionLevel s
unionLevel
        LSMContent s -> ST s ()
forall s. LSMContent s -> ST s ()
invariant LSMContent s
content'
        STRef s (LSMContent s) -> LSMContent s -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (LSMContent s)
lsmr LSMContent s
content'
      else
        STRef s (LSMContent s) -> LSMContent s -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (LSMContent s)
lsmr (Buffer -> Levels s -> UnionLevel s -> LSMContent s
forall s. Buffer -> Levels s -> UnionLevel s -> LSMContent s
LSMContent Buffer
wb' Levels s
ls UnionLevel s
unionLevel)

supplyMergeCredits :: LSM s -> NominalCredit -> ST s ()
supplyMergeCredits :: forall s. LSM s -> NominalCredit -> ST s ()
supplyMergeCredits (LSMHandle STRef s Int
scr STRef s (LSMContent s)
lsmr) NominalCredit
credits = do
    content :: LSMContent s
content@(LSMContent Buffer
_ Levels s
ls UnionLevel s
_) <- STRef s (LSMContent s) -> ST s (LSMContent s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (LSMContent s)
lsmr
    STRef s Int -> (Int -> Int) -> ST s ()
forall s a. STRef s a -> (a -> a) -> ST s ()
modifySTRef' STRef s Int
scr (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
    NominalCredit -> Levels s -> ST s ()
forall s. NominalCredit -> Levels s -> ST s ()
supplyCreditsLevels NominalCredit
credits Levels s
ls
    LSMContent s -> ST s ()
forall s. LSMContent s -> ST s ()
invariant LSMContent s
content

data LookupResult v b =
    NotFound
  | Found !v !(Maybe b)
  deriving stock (LookupResult v b -> LookupResult v b -> Bool
(LookupResult v b -> LookupResult v b -> Bool)
-> (LookupResult v b -> LookupResult v b -> Bool)
-> Eq (LookupResult v b)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall v b.
(Eq v, Eq b) =>
LookupResult v b -> LookupResult v b -> Bool
$c== :: forall v b.
(Eq v, Eq b) =>
LookupResult v b -> LookupResult v b -> Bool
== :: LookupResult v b -> LookupResult v b -> Bool
$c/= :: forall v b.
(Eq v, Eq b) =>
LookupResult v b -> LookupResult v b -> Bool
/= :: LookupResult v b -> LookupResult v b -> Bool
Eq, Int -> LookupResult v b -> ShowS
[LookupResult v b] -> ShowS
LookupResult v b -> String
(Int -> LookupResult v b -> ShowS)
-> (LookupResult v b -> String)
-> ([LookupResult v b] -> ShowS)
-> Show (LookupResult v b)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall v b. (Show v, Show b) => Int -> LookupResult v b -> ShowS
forall v b. (Show v, Show b) => [LookupResult v b] -> ShowS
forall v b. (Show v, Show b) => LookupResult v b -> String
$cshowsPrec :: forall v b. (Show v, Show b) => Int -> LookupResult v b -> ShowS
showsPrec :: Int -> LookupResult v b -> ShowS
$cshow :: forall v b. (Show v, Show b) => LookupResult v b -> String
show :: LookupResult v b -> String
$cshowList :: forall v b. (Show v, Show b) => [LookupResult v b] -> ShowS
showList :: [LookupResult v b] -> ShowS
Show)

lookups :: LSM s -> [Key] -> ST s [LookupResult Value Blob]
lookups :: forall s. LSM s -> [Key] -> ST s [LookupResult Value Blob]
lookups (LSMHandle STRef s Int
_ STRef s (LSMContent s)
lsmr) [Key]
ks = do
    LSMContent Buffer
wb Levels s
ls UnionLevel s
ul <- STRef s (LSMContent s) -> ST s (LSMContent s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (LSMContent s)
lsmr
    [Buffer]
runs <- [[Buffer]] -> [Buffer]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[Buffer]] -> [Buffer]) -> ST s [[Buffer]] -> ST s [Buffer]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Levels s -> ST s [[Buffer]]
forall s. Levels s -> ST s [[Buffer]]
flattenLevels Levels s
ls
    (Key -> ST s (LookupResult Value Blob))
-> [Key] -> ST s [LookupResult Value Blob]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse (Buffer
-> [Buffer]
-> UnionLevel s
-> Key
-> ST s (LookupResult Value Blob)
forall s.
Buffer
-> [Buffer]
-> UnionLevel s
-> Key
-> ST s (LookupResult Value Blob)
doLookup Buffer
wb [Buffer]
runs UnionLevel s
ul) [Key]
ks

lookup :: LSM s -> Key -> ST s (LookupResult Value Blob)
lookup :: forall s. LSM s -> Key -> ST s (LookupResult Value Blob)
lookup (LSMHandle STRef s Int
_ STRef s (LSMContent s)
lsmr) Key
k = do
    LSMContent Buffer
wb Levels s
ls UnionLevel s
ul <- STRef s (LSMContent s) -> ST s (LSMContent s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (LSMContent s)
lsmr
    [Buffer]
runs <- [[Buffer]] -> [Buffer]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[Buffer]] -> [Buffer]) -> ST s [[Buffer]] -> ST s [Buffer]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Levels s -> ST s [[Buffer]]
forall s. Levels s -> ST s [[Buffer]]
flattenLevels Levels s
ls
    Buffer
-> [Buffer]
-> UnionLevel s
-> Key
-> ST s (LookupResult Value Blob)
forall s.
Buffer
-> [Buffer]
-> UnionLevel s
-> Key
-> ST s (LookupResult Value Blob)
doLookup Buffer
wb [Buffer]
runs UnionLevel s
ul Key
k

duplicate :: LSM s -> ST s (LSM s)
duplicate :: forall s. LSM s -> ST s (LSM s)
duplicate (LSMHandle STRef s Int
_scr STRef s (LSMContent s)
lsmr) = do
    STRef s Int
scr'  <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef Int
0
    STRef s (LSMContent s)
lsmr' <- LSMContent s -> ST s (STRef s (LSMContent s))
forall a s. a -> ST s (STRef s a)
newSTRef (LSMContent s -> ST s (STRef s (LSMContent s)))
-> ST s (LSMContent s) -> ST s (STRef s (LSMContent s))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STRef s (LSMContent s) -> ST s (LSMContent s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (LSMContent s)
lsmr
    LSM s -> ST s (LSM s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (STRef s Int -> STRef s (LSMContent s) -> LSM s
forall s. STRef s Int -> STRef s (LSMContent s) -> LSM s
LSMHandle STRef s Int
scr' STRef s (LSMContent s)
lsmr')
    -- it's that simple here, because we share all the pure value and all the
    -- STRefs and there's no ref counting to be done

-- | Similar to @Data.Map.unionWith@.
--
-- A call to 'union' itself is not expensive, as the input tables are not
-- immediately merged. Instead, it creates a representation of an in-progress
-- merge that can be performed incrementally (somewhat similar to a thunk).
--
-- The more merge work remains, the more expensive are lookups on the table.
unions :: [LSM s] -> ST s (LSM s)
unions :: forall s. [LSM s] -> ST s (LSM s)
unions [LSM s]
lsms = do
    [Maybe (MergingTree s)]
trees <- [LSM s]
-> (LSM s -> ST s (Maybe (MergingTree s)))
-> ST s [Maybe (MergingTree s)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [LSM s]
lsms ((LSM s -> ST s (Maybe (MergingTree s)))
 -> ST s [Maybe (MergingTree s)])
-> (LSM s -> ST s (Maybe (MergingTree s)))
-> ST s [Maybe (MergingTree s)]
forall a b. (a -> b) -> a -> b
$ \(LSMHandle STRef s Int
_ STRef s (LSMContent s)
lsmr) ->
      LSMContent s -> ST s (Maybe (MergingTree s))
forall s. LSMContent s -> ST s (Maybe (MergingTree s))
contentToMergingTree (LSMContent s -> ST s (Maybe (MergingTree s)))
-> ST s (LSMContent s) -> ST s (Maybe (MergingTree s))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STRef s (LSMContent s) -> ST s (LSMContent s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (LSMContent s)
lsmr
    -- TODO: if only one table is non-empty, we don't have to create a Union,
    -- we can just duplicate the table.
    UnionLevel s
unionLevel <- [MergingTree s] -> ST s (Maybe (MergingTree s))
forall s. [MergingTree s] -> ST s (Maybe (MergingTree s))
newPendingUnionMerge ([Maybe (MergingTree s)] -> [MergingTree s]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (MergingTree s)]
trees) ST s (Maybe (MergingTree s))
-> (Maybe (MergingTree s) -> ST s (UnionLevel s))
-> ST s (UnionLevel s)
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe (MergingTree s)
Nothing -> UnionLevel s -> ST s (UnionLevel s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return UnionLevel s
forall s. UnionLevel s
NoUnion
      Just MergingTree s
tree -> do
        Int
debt <- (Int, Int) -> Int
forall a b. (a, b) -> a
fst ((Int, Int) -> Int) -> ST s (Int, Int) -> ST s Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MergingTree s -> ST s (Int, Int)
forall s. MergingTree s -> ST s (Int, Int)
remainingDebtMergingTree MergingTree s
tree
        MergingTree s -> STRef s Int -> UnionLevel s
forall s. MergingTree s -> STRef s Int -> UnionLevel s
Union MergingTree s
tree (STRef s Int -> UnionLevel s)
-> ST s (STRef s Int) -> ST s (UnionLevel s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef Int
debt
    STRef s (LSMContent s)
lsmr <- LSMContent s -> ST s (STRef s (LSMContent s))
forall a s. a -> ST s (STRef s a)
newSTRef (Buffer -> Levels s -> UnionLevel s -> LSMContent s
forall s. Buffer -> Levels s -> UnionLevel s -> LSMContent s
LSMContent Buffer
forall k a. Map k a
Map.empty [] UnionLevel s
unionLevel)
    STRef s Int
c    <- Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef Int
0
    LSM s -> ST s (LSM s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (STRef s Int -> STRef s (LSMContent s) -> LSM s
forall s. STRef s Int -> STRef s (LSMContent s) -> LSM s
LSMHandle STRef s Int
c STRef s (LSMContent s)
lsmr)

-- | The /current/ upper bound on the number of 'UnionCredits' that have to be
-- supplied before a 'union' is completed.
--
-- The union debt is the number of merging steps that need to be performed /at
-- most/ until the delayed work of performing a 'union' is completed. This
-- includes the cost of completing merges that were part of the union's input
-- tables.
newtype UnionDebt = UnionDebt Debt
  deriving stock (Int -> UnionDebt -> ShowS
[UnionDebt] -> ShowS
UnionDebt -> String
(Int -> UnionDebt -> ShowS)
-> (UnionDebt -> String)
-> ([UnionDebt] -> ShowS)
-> Show UnionDebt
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnionDebt -> ShowS
showsPrec :: Int -> UnionDebt -> ShowS
$cshow :: UnionDebt -> String
show :: UnionDebt -> String
$cshowList :: [UnionDebt] -> ShowS
showList :: [UnionDebt] -> ShowS
Show, UnionDebt -> UnionDebt -> Bool
(UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool) -> Eq UnionDebt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnionDebt -> UnionDebt -> Bool
== :: UnionDebt -> UnionDebt -> Bool
$c/= :: UnionDebt -> UnionDebt -> Bool
/= :: UnionDebt -> UnionDebt -> Bool
Eq, Eq UnionDebt
Eq UnionDebt =>
(UnionDebt -> UnionDebt -> Ordering)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> Ord UnionDebt
UnionDebt -> UnionDebt -> Bool
UnionDebt -> UnionDebt -> Ordering
UnionDebt -> UnionDebt -> UnionDebt
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: UnionDebt -> UnionDebt -> Ordering
compare :: UnionDebt -> UnionDebt -> Ordering
$c< :: UnionDebt -> UnionDebt -> Bool
< :: UnionDebt -> UnionDebt -> Bool
$c<= :: UnionDebt -> UnionDebt -> Bool
<= :: UnionDebt -> UnionDebt -> Bool
$c> :: UnionDebt -> UnionDebt -> Bool
> :: UnionDebt -> UnionDebt -> Bool
$c>= :: UnionDebt -> UnionDebt -> Bool
>= :: UnionDebt -> UnionDebt -> Bool
$cmax :: UnionDebt -> UnionDebt -> UnionDebt
max :: UnionDebt -> UnionDebt -> UnionDebt
$cmin :: UnionDebt -> UnionDebt -> UnionDebt
min :: UnionDebt -> UnionDebt -> UnionDebt
Ord)
  deriving newtype Integer -> UnionDebt
UnionDebt -> UnionDebt
UnionDebt -> UnionDebt -> UnionDebt
(UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (Integer -> UnionDebt)
-> Num UnionDebt
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: UnionDebt -> UnionDebt -> UnionDebt
+ :: UnionDebt -> UnionDebt -> UnionDebt
$c- :: UnionDebt -> UnionDebt -> UnionDebt
- :: UnionDebt -> UnionDebt -> UnionDebt
$c* :: UnionDebt -> UnionDebt -> UnionDebt
* :: UnionDebt -> UnionDebt -> UnionDebt
$cnegate :: UnionDebt -> UnionDebt
negate :: UnionDebt -> UnionDebt
$cabs :: UnionDebt -> UnionDebt
abs :: UnionDebt -> UnionDebt
$csignum :: UnionDebt -> UnionDebt
signum :: UnionDebt -> UnionDebt
$cfromInteger :: Integer -> UnionDebt
fromInteger :: Integer -> UnionDebt
Num

-- | Return the current union debt. This debt can be reduced until it is paid
-- off using 'supplyUnionCredits'.
remainingUnionDebt :: LSM s -> ST s UnionDebt
remainingUnionDebt :: forall s. LSM s -> ST s UnionDebt
remainingUnionDebt (LSMHandle STRef s Int
_ STRef s (LSMContent s)
lsmr) = do
    LSMContent Buffer
_ Levels s
_ UnionLevel s
ul <- STRef s (LSMContent s) -> ST s (LSMContent s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (LSMContent s)
lsmr
    Int -> UnionDebt
UnionDebt (Int -> UnionDebt) -> ST s Int -> ST s UnionDebt
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> case UnionLevel s
ul of
      UnionLevel s
NoUnion      -> Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0
      Union MergingTree s
tree STRef s Int
d -> MergingTree s -> STRef s Int -> ST s Int
forall s. MergingTree s -> STRef s Int -> ST s Int
checkedUnionDebt MergingTree s
tree STRef s Int
d

-- | Credits are used to pay off 'UnionDebt', completing a 'union' in the
-- process.
--
-- A union credit corresponds to a single merging step being performed.
newtype UnionCredits = UnionCredits Credit
  deriving stock (Int -> UnionCredits -> ShowS
[UnionCredits] -> ShowS
UnionCredits -> String
(Int -> UnionCredits -> ShowS)
-> (UnionCredits -> String)
-> ([UnionCredits] -> ShowS)
-> Show UnionCredits
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnionCredits -> ShowS
showsPrec :: Int -> UnionCredits -> ShowS
$cshow :: UnionCredits -> String
show :: UnionCredits -> String
$cshowList :: [UnionCredits] -> ShowS
showList :: [UnionCredits] -> ShowS
Show, UnionCredits -> UnionCredits -> Bool
(UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool) -> Eq UnionCredits
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnionCredits -> UnionCredits -> Bool
== :: UnionCredits -> UnionCredits -> Bool
$c/= :: UnionCredits -> UnionCredits -> Bool
/= :: UnionCredits -> UnionCredits -> Bool
Eq, Eq UnionCredits
Eq UnionCredits =>
(UnionCredits -> UnionCredits -> Ordering)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> Ord UnionCredits
UnionCredits -> UnionCredits -> Bool
UnionCredits -> UnionCredits -> Ordering
UnionCredits -> UnionCredits -> UnionCredits
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: UnionCredits -> UnionCredits -> Ordering
compare :: UnionCredits -> UnionCredits -> Ordering
$c< :: UnionCredits -> UnionCredits -> Bool
< :: UnionCredits -> UnionCredits -> Bool
$c<= :: UnionCredits -> UnionCredits -> Bool
<= :: UnionCredits -> UnionCredits -> Bool
$c> :: UnionCredits -> UnionCredits -> Bool
> :: UnionCredits -> UnionCredits -> Bool
$c>= :: UnionCredits -> UnionCredits -> Bool
>= :: UnionCredits -> UnionCredits -> Bool
$cmax :: UnionCredits -> UnionCredits -> UnionCredits
max :: UnionCredits -> UnionCredits -> UnionCredits
$cmin :: UnionCredits -> UnionCredits -> UnionCredits
min :: UnionCredits -> UnionCredits -> UnionCredits
Ord)
  deriving newtype Integer -> UnionCredits
UnionCredits -> UnionCredits
UnionCredits -> UnionCredits -> UnionCredits
(UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (Integer -> UnionCredits)
-> Num UnionCredits
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: UnionCredits -> UnionCredits -> UnionCredits
+ :: UnionCredits -> UnionCredits -> UnionCredits
$c- :: UnionCredits -> UnionCredits -> UnionCredits
- :: UnionCredits -> UnionCredits -> UnionCredits
$c* :: UnionCredits -> UnionCredits -> UnionCredits
* :: UnionCredits -> UnionCredits -> UnionCredits
$cnegate :: UnionCredits -> UnionCredits
negate :: UnionCredits -> UnionCredits
$cabs :: UnionCredits -> UnionCredits
abs :: UnionCredits -> UnionCredits
$csignum :: UnionCredits -> UnionCredits
signum :: UnionCredits -> UnionCredits
$cfromInteger :: Integer -> UnionCredits
fromInteger :: Integer -> UnionCredits
Num

-- | Supply union credits to reduce union debt.
--
-- Supplying union credits leads to union merging work being performed in
-- batches. This reduces the union debt returned by 'remainingUnionDebt'. Union
-- debt will be reduced by /at least/ the number of supplied union credits. It
-- is therefore advisable to query 'remainingUnionDebt' every once in a while to
-- see what the current debt is.
--
-- This function returns any surplus of union credits as /leftover/ credits when
-- a union has finished. In particular, if the returned number of credits is
-- non-negative, then the union is finished.
supplyUnionCredits :: LSM s -> UnionCredits -> ST s UnionCredits
supplyUnionCredits :: forall s. LSM s -> UnionCredits -> ST s UnionCredits
supplyUnionCredits (LSMHandle STRef s Int
scr STRef s (LSMContent s)
lsmr) (UnionCredits Int
credits)
  | Int
credits Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = UnionCredits -> ST s UnionCredits
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> UnionCredits
UnionCredits Int
0)
  | Bool
otherwise = do
    content :: LSMContent s
content@(LSMContent Buffer
_ Levels s
_ UnionLevel s
ul) <- STRef s (LSMContent s) -> ST s (LSMContent s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (LSMContent s)
lsmr
    Int -> UnionCredits
UnionCredits (Int -> UnionCredits) -> ST s Int -> ST s UnionCredits
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> case UnionLevel s
ul of
      UnionLevel s
NoUnion ->
        Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
credits
      Union MergingTree s
tree STRef s Int
debtRef -> do
        STRef s Int -> (Int -> Int) -> ST s ()
forall s a. STRef s a -> (a -> a) -> ST s ()
modifySTRef' STRef s Int
scr (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
        Int
_debt <- MergingTree s -> STRef s Int -> ST s Int
forall s. MergingTree s -> STRef s Int -> ST s Int
checkedUnionDebt MergingTree s
tree STRef s Int
debtRef  -- just to make sure it's checked
        Int
c' <- Int -> MergingTree s -> ST s Int
forall s. Int -> MergingTree s -> ST s Int
supplyCreditsMergingTree Int
credits MergingTree s
tree
        Int
debt' <- MergingTree s -> STRef s Int -> ST s Int
forall s. MergingTree s -> STRef s Int -> ST s Int
checkedUnionDebt MergingTree s
tree STRef s Int
debtRef
        Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
debt' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$
          Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Int
c' Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0  -- should have spent these credits
        LSMContent s -> ST s ()
forall s. LSMContent s -> ST s ()
invariant LSMContent s
content
        Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
c'

-- TODO: At some point the completed merging tree should to moved into the
-- regular levels, so it can be merged with other runs and last level merges can
-- happen again to drop deletes. Also, lookups then don't need to handle the
-- merging tree any more. There are two possible strategies:
--
-- 1. As soon as the merging tree completes, move the resulting run to the
--    regular levels. However, its size does generally not fit the last level,
--    which requires relaxing 'invariant' and adjusting 'increment'.
--
--    If the run is much larger than the resident and incoming runs of the last
--    level, it should also not be included into a merge yet, as that merge
--    would be expensive, but offer very little potential for compaction (the
--    run from the merging tree is already compacted after all). So it needs to
--    be bumped to the next level instead.
--
-- 2. Initially leave the completed run in the union level. Then every time a
--    new last level merge is created in 'increment', check if there is a
--    completed run in the union level with a size that fits the new merge. If
--    yes, move it over.

-- | Like 'remainingDebtMergingTree', but additionally asserts that the debt
-- never increases.
checkedUnionDebt :: MergingTree s -> STRef s Debt -> ST s Debt
checkedUnionDebt :: forall s. MergingTree s -> STRef s Int -> ST s Int
checkedUnionDebt MergingTree s
tree STRef s Int
debtRef = do
    Int
storedDebt <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
debtRef
    Int
debt <- (Int, Int) -> Int
forall a b. (a, b) -> a
fst ((Int, Int) -> Int) -> ST s (Int, Int) -> ST s Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MergingTree s -> ST s (Int, Int)
forall s. MergingTree s -> ST s (Int, Int)
remainingDebtMergingTree MergingTree s
tree
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Int
debt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
storedDebt
    STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
debtRef Int
debt
    Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
debt

-------------------------------------------------------------------------------
-- Lookups
--

type LookupAcc = Maybe Op

updateAcc :: (Op -> Op -> Op) -> LookupAcc -> Op -> LookupAcc
updateAcc :: (Op -> Op -> Op) -> LookupAcc -> Op -> LookupAcc
updateAcc Op -> Op -> Op
_ LookupAcc
Nothing     Op
old = Op -> LookupAcc
forall a. a -> Maybe a
Just Op
old
updateAcc Op -> Op -> Op
f (Just Op
new_) Op
old = Op -> LookupAcc
forall a. a -> Maybe a
Just (Op -> Op -> Op
f Op
new_ Op
old)  -- acc has more recent Op

mergeAcc :: TreeMergeType -> [LookupAcc] -> LookupAcc
mergeAcc :: TreeMergeType -> [LookupAcc] -> LookupAcc
mergeAcc TreeMergeType
mt = (LookupAcc -> Op -> LookupAcc) -> LookupAcc -> [Op] -> LookupAcc
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl ((Op -> Op -> Op) -> LookupAcc -> Op -> LookupAcc
updateAcc Op -> Op -> Op
com) LookupAcc
forall a. Maybe a
Nothing ([Op] -> LookupAcc)
-> ([LookupAcc] -> [Op]) -> [LookupAcc] -> LookupAcc
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [LookupAcc] -> [Op]
forall a. [Maybe a] -> [a]
catMaybes
  where
    com :: Op -> Op -> Op
com = case TreeMergeType
mt of
      TreeMergeType
MergeLevel -> Op -> Op -> Op
combine
      TreeMergeType
MergeUnion -> Op -> Op -> Op
combineUnion

-- | We handle lookups by accumulating results by going through the runs from
-- most recent to least recent, starting with the write buffer.
--
-- In the real implementation, this is done not on an individual 'LookupAcc',
-- but one for each key, i.e. @Vector (Maybe Entry)@.
doLookup :: Buffer -> [Run] -> UnionLevel s -> Key -> ST s (LookupResult Value Blob)
doLookup :: forall s.
Buffer
-> [Buffer]
-> UnionLevel s
-> Key
-> ST s (LookupResult Value Blob)
doLookup Buffer
wb [Buffer]
runs UnionLevel s
ul Key
k = do
    let acc0 :: LookupAcc
acc0 = LookupAcc -> Key -> [Buffer] -> LookupAcc
lookupBatch (Key -> Buffer -> LookupAcc
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Key
k Buffer
wb) Key
k [Buffer]
runs
    case UnionLevel s
ul of
      UnionLevel s
NoUnion ->
        LookupResult Value Blob -> ST s (LookupResult Value Blob)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (LookupAcc -> LookupResult Value Blob
convertAcc LookupAcc
acc0)
      Union MergingTree s
tree STRef s Int
_ -> do
        LookupTree [Buffer]
treeBatches <- MergingTree s -> ST s (LookupTree [Buffer])
forall s. MergingTree s -> ST s (LookupTree [Buffer])
buildLookupTree MergingTree s
tree
        let treeResults :: LookupTree LookupAcc
treeResults = LookupAcc -> Key -> [Buffer] -> LookupAcc
lookupBatch LookupAcc
forall a. Maybe a
Nothing Key
k ([Buffer] -> LookupAcc)
-> LookupTree [Buffer] -> LookupTree LookupAcc
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LookupTree [Buffer]
treeBatches
        LookupResult Value Blob -> ST s (LookupResult Value Blob)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (LookupResult Value Blob -> ST s (LookupResult Value Blob))
-> LookupResult Value Blob -> ST s (LookupResult Value Blob)
forall a b. (a -> b) -> a -> b
$ LookupAcc -> LookupResult Value Blob
convertAcc (LookupAcc -> LookupResult Value Blob)
-> LookupAcc -> LookupResult Value Blob
forall a b. (a -> b) -> a -> b
$ LookupTree LookupAcc -> LookupAcc
foldLookupTree (LookupTree LookupAcc -> LookupAcc)
-> LookupTree LookupAcc -> LookupAcc
forall a b. (a -> b) -> a -> b
$
          if Buffer -> Bool
forall a. Map Key a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Buffer
wb Bool -> Bool -> Bool
&& [Buffer] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Buffer]
runs
          then LookupTree LookupAcc
treeResults
          else TreeMergeType -> [LookupTree LookupAcc] -> LookupTree LookupAcc
forall a. TreeMergeType -> [LookupTree a] -> LookupTree a
LookupNode TreeMergeType
MergeLevel [LookupAcc -> LookupTree LookupAcc
forall a. a -> LookupTree a
LookupBatch LookupAcc
acc0, LookupTree LookupAcc
treeResults ]
  where
    convertAcc :: LookupAcc -> LookupResult Value Blob
    convertAcc :: LookupAcc -> LookupResult Value Blob
convertAcc = \case
        LookupAcc
Nothing           -> LookupResult Value Blob
forall v b. LookupResult v b
NotFound
        Just (Insert Value
v Maybe Blob
b) -> Value -> Maybe Blob -> LookupResult Value Blob
forall v b. v -> Maybe b -> LookupResult v b
Found Value
v Maybe Blob
b
        Just (Mupsert Value
v)  -> Value -> Maybe Blob -> LookupResult Value Blob
forall v b. v -> Maybe b -> LookupResult v b
Found Value
v Maybe Blob
forall a. Maybe a
Nothing
        Just Op
Delete       -> LookupResult Value Blob
forall v b. LookupResult v b
NotFound

-- | Perform a batch of lookups, accumulating the result onto an initial
-- 'LookupAcc'.
--
-- In a real implementation, this would take all keys at once and be in IO.
lookupBatch :: LookupAcc -> Key -> [Run] -> LookupAcc
lookupBatch :: LookupAcc -> Key -> [Buffer] -> LookupAcc
lookupBatch LookupAcc
acc Key
k [Buffer]
rs =
    let ops :: [Op]
ops = [Op
op | Buffer
r <- [Buffer]
rs, Just Op
op <- [Key -> Buffer -> LookupAcc
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Key
k Buffer
r]]
    in (LookupAcc -> Op -> LookupAcc) -> LookupAcc -> [Op] -> LookupAcc
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl ((Op -> Op -> Op) -> LookupAcc -> Op -> LookupAcc
updateAcc Op -> Op -> Op
combine) LookupAcc
acc [Op]
ops

data LookupTree a = LookupBatch a
                  | LookupNode TreeMergeType [LookupTree a]
  deriving stock (forall a b. (a -> b) -> LookupTree a -> LookupTree b)
-> (forall a b. a -> LookupTree b -> LookupTree a)
-> Functor LookupTree
forall a b. a -> LookupTree b -> LookupTree a
forall a b. (a -> b) -> LookupTree a -> LookupTree b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> LookupTree a -> LookupTree b
fmap :: forall a b. (a -> b) -> LookupTree a -> LookupTree b
$c<$ :: forall a b. a -> LookupTree b -> LookupTree a
<$ :: forall a b. a -> LookupTree b -> LookupTree a
Functor

-- | Do lookups on runs at the leaves and recursively combine the resulting
-- 'LookupAcc's, either using 'mergeAcc' or 'unionAcc' depending on the merge
-- type.
--
-- Doing this naively would result in a call to 'lookupBatch' and creation of
-- a 'LookupAcc' for each run in the tree. However, when there are adjacent
-- 'Run's or 'MergingRuns' (with 'MergeLevel') as inputs to a level-merge, we
-- combine them into a single batch of runs.
--
-- For example, this means that if we union two tables (which themselves don't
-- have a union level) and then do lookups, two batches of lookups have to be
-- performed (plus a batch for the table's regular levels if it has been updated
-- after the union).
--
-- TODO: we can still improve the batching, for example combining the child of
-- PendingLevelMerge with the pre-existing runs when it is already completed.
buildLookupTree :: MergingTree s -> ST s (LookupTree [Run])
buildLookupTree :: forall s. MergingTree s -> ST s (LookupTree [Buffer])
buildLookupTree = MergingTree s -> ST s (LookupTree [Buffer])
forall s. MergingTree s -> ST s (LookupTree [Buffer])
go
  where
    go :: MergingTree s -> ST s (LookupTree [Run])
    go :: forall s. MergingTree s -> ST s (LookupTree [Buffer])
go (MergingTree STRef s (MergingTreeState s)
treeState) = STRef s (MergingTreeState s) -> ST s (MergingTreeState s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MergingTreeState s)
treeState ST s (MergingTreeState s)
-> (MergingTreeState s -> ST s (LookupTree [Buffer]))
-> ST s (LookupTree [Buffer])
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        CompletedTreeMerge Buffer
r ->
          LookupTree [Buffer] -> ST s (LookupTree [Buffer])
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (LookupTree [Buffer] -> ST s (LookupTree [Buffer]))
-> LookupTree [Buffer] -> ST s (LookupTree [Buffer])
forall a b. (a -> b) -> a -> b
$ [Buffer] -> LookupTree [Buffer]
forall a. a -> LookupTree a
LookupBatch [Buffer
r]
        OngoingTreeMerge (MergingRun TreeMergeType
mt MergeDebt
_ STRef s MergingRunState
mergeState) ->
          STRef s MergingRunState -> ST s MergingRunState
forall s a. STRef s a -> ST s a
readSTRef STRef s MergingRunState
mergeState ST s MergingRunState
-> (MergingRunState -> ST s (LookupTree [Buffer]))
-> ST s (LookupTree [Buffer])
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            CompletedMerge Buffer
r ->
              LookupTree [Buffer] -> ST s (LookupTree [Buffer])
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (LookupTree [Buffer] -> ST s (LookupTree [Buffer]))
-> LookupTree [Buffer] -> ST s (LookupTree [Buffer])
forall a b. (a -> b) -> a -> b
$ [Buffer] -> LookupTree [Buffer]
forall a. a -> LookupTree a
LookupBatch [Buffer
r]
            OngoingMerge MergeCredit
_ [Buffer]
rs Buffer
_ -> case TreeMergeType
mt of
              TreeMergeType
MergeLevel -> LookupTree [Buffer] -> ST s (LookupTree [Buffer])
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (LookupTree [Buffer] -> ST s (LookupTree [Buffer]))
-> LookupTree [Buffer] -> ST s (LookupTree [Buffer])
forall a b. (a -> b) -> a -> b
$ [Buffer] -> LookupTree [Buffer]
forall a. a -> LookupTree a
LookupBatch [Buffer]
rs  -- combine into batch
              TreeMergeType
MergeUnion -> LookupTree [Buffer] -> ST s (LookupTree [Buffer])
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (LookupTree [Buffer] -> ST s (LookupTree [Buffer]))
-> LookupTree [Buffer] -> ST s (LookupTree [Buffer])
forall a b. (a -> b) -> a -> b
$ TreeMergeType -> [LookupTree [Buffer]] -> LookupTree [Buffer]
forall a. TreeMergeType -> [LookupTree a] -> LookupTree a
LookupNode TreeMergeType
MergeUnion ([LookupTree [Buffer]] -> LookupTree [Buffer])
-> [LookupTree [Buffer]] -> LookupTree [Buffer]
forall a b. (a -> b) -> a -> b
$ (Buffer -> LookupTree [Buffer])
-> [Buffer] -> [LookupTree [Buffer]]
forall a b. (a -> b) -> [a] -> [b]
map (\Buffer
r -> [Buffer] -> LookupTree [Buffer]
forall a. a -> LookupTree a
LookupBatch [Buffer
r]) [Buffer]
rs
        PendingTreeMerge (PendingLevelMerge [PreExistingRun s]
prs Maybe (MergingTree s)
tree) -> do
          LookupTree [Buffer]
preExisting <- [Buffer] -> LookupTree [Buffer]
forall a. a -> LookupTree a
LookupBatch ([Buffer] -> LookupTree [Buffer])
-> ([[Buffer]] -> [Buffer]) -> [[Buffer]] -> LookupTree [Buffer]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [[Buffer]] -> [Buffer]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[Buffer]] -> LookupTree [Buffer])
-> ST s [[Buffer]] -> ST s (LookupTree [Buffer])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
            (PreExistingRun s -> ST s [Buffer])
-> [PreExistingRun s] -> ST s [[Buffer]]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse PreExistingRun s -> ST s [Buffer]
forall s. PreExistingRun s -> ST s [Buffer]
flattenPreExistingRun [PreExistingRun s]
prs -- combine into batch
          case Maybe (MergingTree s)
tree of
            Maybe (MergingTree s)
Nothing -> LookupTree [Buffer] -> ST s (LookupTree [Buffer])
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return LookupTree [Buffer]
preExisting
            Just MergingTree s
t  -> do
              LookupTree [Buffer]
lTree <- MergingTree s -> ST s (LookupTree [Buffer])
forall s. MergingTree s -> ST s (LookupTree [Buffer])
go MergingTree s
t
              LookupTree [Buffer] -> ST s (LookupTree [Buffer])
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (TreeMergeType -> [LookupTree [Buffer]] -> LookupTree [Buffer]
forall a. TreeMergeType -> [LookupTree a] -> LookupTree a
LookupNode TreeMergeType
MergeLevel [LookupTree [Buffer]
preExisting, LookupTree [Buffer]
lTree])
        PendingTreeMerge (PendingUnionMerge [MergingTree s]
trees) -> do
          TreeMergeType -> [LookupTree [Buffer]] -> LookupTree [Buffer]
forall a. TreeMergeType -> [LookupTree a] -> LookupTree a
LookupNode TreeMergeType
MergeUnion ([LookupTree [Buffer]] -> LookupTree [Buffer])
-> ST s [LookupTree [Buffer]] -> ST s (LookupTree [Buffer])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (MergingTree s -> ST s (LookupTree [Buffer]))
-> [MergingTree s] -> ST s [LookupTree [Buffer]]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse MergingTree s -> ST s (LookupTree [Buffer])
forall s. MergingTree s -> ST s (LookupTree [Buffer])
go [MergingTree s]
trees

foldLookupTree :: LookupTree LookupAcc -> LookupAcc
foldLookupTree :: LookupTree LookupAcc -> LookupAcc
foldLookupTree = \case
    LookupBatch LookupAcc
acc        -> LookupAcc
acc
    LookupNode TreeMergeType
mt [LookupTree LookupAcc]
children -> TreeMergeType -> [LookupAcc] -> LookupAcc
mergeAcc TreeMergeType
mt ((LookupTree LookupAcc -> LookupAcc)
-> [LookupTree LookupAcc] -> [LookupAcc]
forall a b. (a -> b) -> [a] -> [b]
map LookupTree LookupAcc -> LookupAcc
foldLookupTree [LookupTree LookupAcc]
children)

-------------------------------------------------------------------------------
-- Nominal credits
--

-- | Nominal credit is the credit supplied to each level as we insert update
-- operations, one credit per update operation inserted.
--
-- Nominal credit must be supplied up to the 'NominalDebt' to ensure the merge
-- is complete.
--
-- Nominal credits are a similar order of magnitude to physical credits (see
-- 'Credit') but not the same, and we have to scale linearly to convert between
-- them. Physical credits are the actual number of inputs to the merge, which
-- may be somewhat more or somewhat less than the number of update operations
-- we will insert before we need the merge to be complete.
--
newtype NominalCredit = NominalCredit Credit
  deriving stock Int -> NominalCredit -> ShowS
[NominalCredit] -> ShowS
NominalCredit -> String
(Int -> NominalCredit -> ShowS)
-> (NominalCredit -> String)
-> ([NominalCredit] -> ShowS)
-> Show NominalCredit
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NominalCredit -> ShowS
showsPrec :: Int -> NominalCredit -> ShowS
$cshow :: NominalCredit -> String
show :: NominalCredit -> String
$cshowList :: [NominalCredit] -> ShowS
showList :: [NominalCredit] -> ShowS
Show

-- | The nominal debt for a merging run is the worst case (minimum) number of
-- update operations we expect to insert before we expect the merge to be
-- complete.
--
-- We require that an equal amount of nominal credit is supplied before we can
-- expect a merge to be complete.
--
-- We scale linearly to convert nominal credits to physical credits, such that
-- the nominal debt and physical debt are both considered \"100%\", and so that
-- both debts are paid off at exactly the same time.
--
newtype NominalDebt = NominalDebt Credit
  deriving stock Int -> NominalDebt -> ShowS
[NominalDebt] -> ShowS
NominalDebt -> String
(Int -> NominalDebt -> ShowS)
-> (NominalDebt -> String)
-> ([NominalDebt] -> ShowS)
-> Show NominalDebt
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NominalDebt -> ShowS
showsPrec :: Int -> NominalDebt -> ShowS
$cshow :: NominalDebt -> String
show :: NominalDebt -> String
$cshowList :: [NominalDebt] -> ShowS
showList :: [NominalDebt] -> ShowS
Show

-- TODO: If there is a UnionLevel, there is no (more expensive) last level merge
-- in the regular levels, so a little less merging work is required than if
-- there was no UnionLevel. It might be a good idea to spend this "saved" work
-- on the UnionLevel instead. This makes future lookups cheaper and ensures that
-- we can get rid of the UnionLevel at some point, even if a user just keeps
-- inserting without calling 'supplyUnionCredits'.
supplyCreditsLevels :: NominalCredit -> Levels s -> ST s ()
supplyCreditsLevels :: forall s. NominalCredit -> Levels s -> ST s ()
supplyCreditsLevels NominalCredit
nominalDeposit =
  (Level s -> ST s ()) -> [Level s] -> ST s ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ ((Level s -> ST s ()) -> [Level s] -> ST s ())
-> (Level s -> ST s ()) -> [Level s] -> ST s ()
forall a b. (a -> b) -> a -> b
$ \(Level IncomingRun s
ir [Buffer]
_rs) -> do
    case IncomingRun s
ir of
      Single{} -> () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Merging MergePolicy
_mp NominalDebt
nominalDebt STRef s NominalCredit
nominalCreditVar
              mr :: MergingRun LevelMergeType s
mr@(MergingRun LevelMergeType
_  MergeDebt
physicalDebt STRef s MergingRunState
_) -> do

        NominalCredit
nominalCredit       <- NominalDebt
-> STRef s NominalCredit -> NominalCredit -> ST s NominalCredit
forall s.
NominalDebt
-> STRef s NominalCredit -> NominalCredit -> ST s NominalCredit
depositNominalCredit
                                 NominalDebt
nominalDebt STRef s NominalCredit
nominalCreditVar NominalCredit
nominalDeposit
        Int
physicalCredit      <- MergingRun LevelMergeType s -> ST s Int
forall t s. MergingRun t s -> ST s Int
suppliedCreditMergingRun MergingRun LevelMergeType s
mr
        let !physicalCredit' :: Int
physicalCredit' = NominalDebt -> MergeDebt -> NominalCredit -> Int
scaleNominalToPhysicalCredit
                                 NominalDebt
nominalDebt MergeDebt
physicalDebt NominalCredit
nominalCredit
            -- Our target physicalCredit' could actually be less than the
            -- actual current physicalCredit if other tables were contributing
            -- credits to the shared merge.
            !physicalDeposit :: Int
physicalDeposit = Int
physicalCredit' Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
physicalCredit

        -- So we may have a zero or negative deposit, which we ignore.
        Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
physicalDeposit Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ do
          Int
leftoverCredits <- Int -> MergingRun LevelMergeType s -> ST s Int
forall t s. Int -> MergingRun t s -> ST s Int
supplyCreditsMergingRun Int
physicalDeposit MergingRun LevelMergeType s
mr
          -- For merges at ordinary levels (not unions) we expect to hit the
          -- debt limit exactly and not exceed it. However if we had a race
          -- on supplying credit then we could go over (which is not a problem).
          -- We can detect such races if the credit afterwards is not the amount
          -- that we credited. This is all just for sanity checking.
          Int
physicalCredit'' <- MergingRun LevelMergeType s -> ST s Int
forall t s. MergingRun t s -> ST s Int
suppliedCreditMergingRun MergingRun LevelMergeType s
mr
          Bool -> ST s () -> ST s ()
forall a. HasCallStack => Bool -> a -> a
assert (Int
leftoverCredits Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 Bool -> Bool -> Bool
|| Int
physicalCredit' Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
physicalCredit'')
                 (() -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ())

        -- There is a potential race here in between deciding how much physical
        -- credit to supply, and then supplying it. That's because we read the
        -- "current" (absolute) physical credits, decide how much extra
        -- (relative) credits to supply and then do the transaction to supply
        -- the extra (relative) credits. In between the reading and supplying
        -- the current (absolute) physical credits could have changed due to
        -- another thread doing a merge on a different table handle.
        --
        -- This race is relatively benign. When it happens, we will supply more
        -- credit to the merge than either thread intended, however, next time
        -- either thread comes round they'll find the merge has more physical
        -- credits and will thus supply less or none. The only minor problem is
        -- in asserting that we don't supply more physical credits than the
        -- debt limit.

        -- There is a trade-off, we could supply absolute physical credit to
        -- the merging run, and let it calculate the relative credit as part
        -- of the credit transaction. However, we would also need to support
        -- relative credit for the union merges, which do not have any notion
        -- of nominal credit and only work in terms of relative physical credit.
        -- So we can have a simple relative physical credit and rare benign
        -- races, or a more complex scheme for contributing physical credits
        -- either as absolute or relative values.

scaleNominalToPhysicalCredit ::
     NominalDebt
  -> MergeDebt
  -> NominalCredit
  -> Credit
scaleNominalToPhysicalCredit :: NominalDebt -> MergeDebt -> NominalCredit -> Int
scaleNominalToPhysicalCredit (NominalDebt Int
nominalDebt)
                             MergeDebt { totalDebt :: MergeDebt -> Int
totalDebt = Int
physicalDebt }
                             (NominalCredit Int
nominalCredit) =
    Rational -> Int
forall b. Integral b => Rational -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor (Rational -> Int) -> Rational -> Int
forall a b. (a -> b) -> a -> b
$ Int -> Rational
forall a. Real a => a -> Rational
toRational Int
nominalCredit Rational -> Rational -> Rational
forall a. Num a => a -> a -> a
* Int -> Rational
forall a. Real a => a -> Rational
toRational Int
physicalDebt
                                     Rational -> Rational -> Rational
forall a. Fractional a => a -> a -> a
/ Int -> Rational
forall a. Real a => a -> Rational
toRational Int
nominalDebt
    -- This specification using Rational as an intermediate representation can
    -- be implemented efficiently using only integer operations.

depositNominalCredit ::
     NominalDebt
  -> STRef s NominalCredit
  -> NominalCredit
  -> ST s NominalCredit
depositNominalCredit :: forall s.
NominalDebt
-> STRef s NominalCredit -> NominalCredit -> ST s NominalCredit
depositNominalCredit (NominalDebt Int
nominalDebt)
                     STRef s NominalCredit
nominalCreditVar
                     (NominalCredit Int
deposit) = do
    NominalCredit Int
before <- STRef s NominalCredit -> ST s NominalCredit
forall s a. STRef s a -> ST s a
readSTRef STRef s NominalCredit
nominalCreditVar
    -- Depositing _could_ leave the credit higher than the debt, because
    -- sometimes under-full runs mean we don't shuffle runs down the levels
    -- as quickly as the worst case. So here we do just drop excess nominal
    -- credits.
    let !after :: NominalCredit
after = Int -> NominalCredit
NominalCredit (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (Int
before Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
deposit) Int
nominalDebt)
    STRef s NominalCredit -> NominalCredit -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s NominalCredit
nominalCreditVar NominalCredit
after
    NominalCredit -> ST s NominalCredit
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return NominalCredit
after

-------------------------------------------------------------------------------
-- Updates
--

increment :: forall s. Tracer (ST s) Event
          -> Counter -> Run -> Levels s -> UnionLevel s -> ST s (Levels s)
increment :: forall s.
Tracer (ST s) Event
-> Int -> Buffer -> Levels s -> UnionLevel s -> ST s (Levels s)
increment Tracer (ST s) Event
tr Int
sc Buffer
run0 Levels s
ls0 UnionLevel s
ul = do
    Int -> [Buffer] -> Levels s -> ST s (Levels s)
go Int
1 [Buffer
run0] Levels s
ls0
  where
    mergeTypeFor :: Levels s -> LevelMergeType
    mergeTypeFor :: Levels s -> LevelMergeType
mergeTypeFor Levels s
ls = Levels s -> UnionLevel s -> LevelMergeType
forall s. [Level s] -> UnionLevel s -> LevelMergeType
mergeTypeForLevel Levels s
ls UnionLevel s
ul

    go :: Int -> [Run] -> Levels s -> ST s (Levels s)
    go :: Int -> [Buffer] -> Levels s -> ST s (Levels s)
go !Int
ln [Buffer]
incoming [] = do
        let mergePolicy :: MergePolicy
mergePolicy = Int -> Levels s -> UnionLevel s -> MergePolicy
forall s. Int -> [Level s] -> UnionLevel s -> MergePolicy
mergePolicyForLevel Int
ln [] UnionLevel s
ul
        Tracer (ST s) EventDetail -> EventDetail -> ST s ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer (ST s) EventDetail
tr' EventDetail
AddLevelEvent
        IncomingRun s
ir <- Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
forall s.
Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
newLevelMerge Tracer (ST s) EventDetail
tr' Int
ln MergePolicy
mergePolicy (Levels s -> LevelMergeType
mergeTypeFor []) [Buffer]
incoming
        Levels s -> ST s (Levels s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (IncomingRun s -> [Buffer] -> Level s
forall s. IncomingRun s -> [Buffer] -> Level s
Level IncomingRun s
ir [] Level s -> Levels s -> Levels s
forall a. a -> [a] -> [a]
: [])
      where
        tr' :: Tracer (ST s) EventDetail
tr' = (EventDetail -> Event)
-> Tracer (ST s) Event -> Tracer (ST s) EventDetail
forall a' a. (a' -> a) -> Tracer (ST s) a -> Tracer (ST s) a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (Int -> Int -> EventDetail -> Event
forall e. Int -> Int -> e -> EventAt e
EventAt Int
sc Int
ln) Tracer (ST s) Event
tr

    go !Int
ln [Buffer]
incoming (Level IncomingRun s
ir [Buffer]
rs : Levels s
ls) = do
      Buffer
r <- case IncomingRun s
ir of
        Single Buffer
r -> Buffer -> ST s Buffer
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Buffer
r
        Merging MergePolicy
mergePolicy NominalDebt
_ STRef s NominalCredit
_ MergingRun LevelMergeType s
mr -> do
          Buffer
r <- MergingRun LevelMergeType s -> ST s Buffer
forall t s. HasCallStack => MergingRun t s -> ST s Buffer
expectCompletedMergingRun MergingRun LevelMergeType s
mr
          Tracer (ST s) EventDetail -> EventDetail -> ST s ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer (ST s) EventDetail
tr' MergeCompletedEvent {
              MergePolicy
mergePolicy :: MergePolicy
mergePolicy :: MergePolicy
mergePolicy,
              mergeType :: LevelMergeType
mergeType = let MergingRun LevelMergeType
mt MergeDebt
_ STRef s MergingRunState
_ = MergingRun LevelMergeType s
mr in LevelMergeType
mt,
              mergeSize :: Int
mergeSize = Buffer -> Int
runSize Buffer
r
            }
          Buffer -> ST s Buffer
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Buffer
r

      let resident :: [Buffer]
resident = Buffer
rBuffer -> [Buffer] -> [Buffer]
forall a. a -> [a] -> [a]
:[Buffer]
rs
      case Int -> Levels s -> UnionLevel s -> MergePolicy
forall s. Int -> [Level s] -> UnionLevel s -> MergePolicy
mergePolicyForLevel Int
ln Levels s
ls UnionLevel s
ul of

        -- If r is still too small for this level then keep it and merge again
        -- with the incoming runs.
        MergePolicy
MergePolicyTiering | Buffer -> Int
tieringRunSizeToLevel Buffer
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
ln -> do
          IncomingRun s
ir' <- Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
forall s.
Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
newLevelMerge Tracer (ST s) EventDetail
tr' Int
ln MergePolicy
MergePolicyTiering (Levels s -> LevelMergeType
mergeTypeFor Levels s
ls) ([Buffer]
incoming [Buffer] -> [Buffer] -> [Buffer]
forall a. [a] -> [a] -> [a]
++ [Buffer
r])
          Levels s -> ST s (Levels s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (IncomingRun s -> [Buffer] -> Level s
forall s. IncomingRun s -> [Buffer] -> Level s
Level IncomingRun s
ir' [Buffer]
rs Level s -> Levels s -> Levels s
forall a. a -> [a] -> [a]
: Levels s
ls)

        -- This tiering level is now full. We take the completed merged run
        -- (the previous incoming runs), plus all the other runs on this level
        -- as a bundle and move them down to the level below. We start a merge
        -- for the new incoming runs. This level is otherwise empty.
        MergePolicy
MergePolicyTiering | Int -> [Buffer] -> [Buffer] -> Bool
tieringLevelIsFull Int
ln [Buffer]
incoming [Buffer]
resident -> do
          IncomingRun s
ir' <- Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
forall s.
Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
newLevelMerge Tracer (ST s) EventDetail
tr' Int
ln MergePolicy
MergePolicyTiering LevelMergeType
MergeMidLevel [Buffer]
incoming
          Levels s
ls' <- Int -> [Buffer] -> Levels s -> ST s (Levels s)
go (Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) [Buffer]
resident Levels s
ls
          Levels s -> ST s (Levels s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (IncomingRun s -> [Buffer] -> Level s
forall s. IncomingRun s -> [Buffer] -> Level s
Level IncomingRun s
ir' [] Level s -> Levels s -> Levels s
forall a. a -> [a] -> [a]
: Levels s
ls')

        -- This tiering level is not yet full. We move the completed merged run
        -- into the level proper, and start the new merge for the incoming runs.
        MergePolicy
MergePolicyTiering -> do
          IncomingRun s
ir' <- Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
forall s.
Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
newLevelMerge Tracer (ST s) EventDetail
tr' Int
ln MergePolicy
MergePolicyTiering (Levels s -> LevelMergeType
mergeTypeFor Levels s
ls) [Buffer]
incoming
          Tracer (ST s) EventDetail -> EventDetail -> ST s ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer (ST s) EventDetail
tr' (Int -> EventDetail
AddRunEvent ([Buffer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Buffer]
resident))
          Levels s -> ST s (Levels s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (IncomingRun s -> [Buffer] -> Level s
forall s. IncomingRun s -> [Buffer] -> Level s
Level IncomingRun s
ir' [Buffer]
resident Level s -> Levels s -> Levels s
forall a. a -> [a] -> [a]
: Levels s
ls)

        -- The final level is using levelling. If the existing completed merge
        -- run is too large for this level, we promote the run to the next
        -- level and start merging the incoming runs into this (otherwise
        -- empty) level .
        MergePolicy
MergePolicyLevelling | Int -> [Buffer] -> Buffer -> Bool
levellingLevelIsFull Int
ln [Buffer]
incoming Buffer
r -> do
          Bool -> ST s () -> ST s ()
forall a. HasCallStack => Bool -> a -> a
assert ([Buffer] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Buffer]
rs Bool -> Bool -> Bool
&& Levels s -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Levels s
ls) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          IncomingRun s
ir' <- Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
forall s.
Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
newLevelMerge Tracer (ST s) EventDetail
tr' Int
ln MergePolicy
MergePolicyTiering LevelMergeType
MergeMidLevel [Buffer]
incoming
          Levels s
ls' <- Int -> [Buffer] -> Levels s -> ST s (Levels s)
go (Int
lnInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) [Buffer
r] []
          Levels s -> ST s (Levels s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (IncomingRun s -> [Buffer] -> Level s
forall s. IncomingRun s -> [Buffer] -> Level s
Level IncomingRun s
ir' [] Level s -> Levels s -> Levels s
forall a. a -> [a] -> [a]
: Levels s
ls')

        -- Otherwise we start merging the incoming runs into the run.
        MergePolicy
MergePolicyLevelling -> do
          Bool -> ST s () -> ST s ()
forall a. HasCallStack => Bool -> a -> a
assert ([Buffer] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Buffer]
rs Bool -> Bool -> Bool
&& Levels s -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Levels s
ls) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ () -> ST s ()
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          IncomingRun s
ir' <- Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
forall s.
Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
newLevelMerge Tracer (ST s) EventDetail
tr' Int
ln MergePolicy
MergePolicyLevelling (Levels s -> LevelMergeType
mergeTypeFor Levels s
ls)
                          ([Buffer]
incoming [Buffer] -> [Buffer] -> [Buffer]
forall a. [a] -> [a] -> [a]
++ [Buffer
r])
          Levels s -> ST s (Levels s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (IncomingRun s -> [Buffer] -> Level s
forall s. IncomingRun s -> [Buffer] -> Level s
Level IncomingRun s
ir' [] Level s -> Levels s -> Levels s
forall a. a -> [a] -> [a]
: [])

      where
        tr' :: Tracer (ST s) EventDetail
tr' = (EventDetail -> Event)
-> Tracer (ST s) Event -> Tracer (ST s) EventDetail
forall a' a. (a' -> a) -> Tracer (ST s) a -> Tracer (ST s) a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (Int -> Int -> EventDetail -> Event
forall e. Int -> Int -> e -> EventAt e
EventAt Int
sc Int
ln) Tracer (ST s) Event
tr

newLevelMerge :: Tracer (ST s) EventDetail
              -> Int -> MergePolicy -> LevelMergeType
              -> [Run] -> ST s (IncomingRun s)
newLevelMerge :: forall s.
Tracer (ST s) EventDetail
-> Int
-> MergePolicy
-> LevelMergeType
-> [Buffer]
-> ST s (IncomingRun s)
newLevelMerge Tracer (ST s) EventDetail
_ Int
_ MergePolicy
_ LevelMergeType
_ [Buffer
r] = IncomingRun s -> ST s (IncomingRun s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer -> IncomingRun s
forall s. Buffer -> IncomingRun s
Single Buffer
r)
newLevelMerge Tracer (ST s) EventDetail
tr Int
level MergePolicy
mergePolicy LevelMergeType
mergeType [Buffer]
rs = do
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST ([Buffer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Buffer]
rs Int -> [Int] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Int
4, Int
5])
    mergingRun :: MergingRun LevelMergeType s
mergingRun@(MergingRun LevelMergeType
_ MergeDebt
physicalDebt STRef s MergingRunState
_) <- LevelMergeType -> [Buffer] -> ST s (MergingRun LevelMergeType s)
forall t s. IsMergeType t => t -> [Buffer] -> ST s (MergingRun t s)
newMergingRun LevelMergeType
mergeType [Buffer]
rs
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (MergeDebt -> Int
totalDebt MergeDebt
physicalDebt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
maxPhysicalDebt)
    Tracer (ST s) EventDetail -> EventDetail -> ST s ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer (ST s) EventDetail
tr MergeStartedEvent {
                   MergePolicy
mergePolicy :: MergePolicy
mergePolicy :: MergePolicy
mergePolicy,
                   LevelMergeType
mergeType :: LevelMergeType
mergeType :: LevelMergeType
mergeType,
                   mergeDebt :: Int
mergeDebt     = MergeDebt -> Int
totalDebt MergeDebt
physicalDebt,
                   mergeRunsSize :: [Int]
mergeRunsSize = (Buffer -> Int) -> [Buffer] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Buffer -> Int
runSize [Buffer]
rs
                 }
    STRef s NominalCredit
nominalCreditVar <- NominalCredit -> ST s (STRef s NominalCredit)
forall a s. a -> ST s (STRef s a)
newSTRef (Int -> NominalCredit
NominalCredit Int
0)
    IncomingRun s -> ST s (IncomingRun s)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MergePolicy
-> NominalDebt
-> STRef s NominalCredit
-> MergingRun LevelMergeType s
-> IncomingRun s
forall s.
MergePolicy
-> NominalDebt
-> STRef s NominalCredit
-> MergingRun LevelMergeType s
-> IncomingRun s
Merging MergePolicy
mergePolicy NominalDebt
nominalDebt STRef s NominalCredit
nominalCreditVar MergingRun LevelMergeType s
mergingRun)
  where
    -- The nominal debt equals the minimum of credits we will supply before we
    -- expect the merge to complete. This is the same as the number of updates
    -- in a run that gets moved to this level.
    nominalDebt :: NominalDebt
nominalDebt = Int -> NominalDebt
NominalDebt (Int -> Int
tieringRunSize Int
level)

    -- The physical debt is the number of actual merge steps we will need to
    -- perform before the merge is complete. This is always the sum of the
    -- lengths of the input runs.
    --
    -- As we supply nominal credit, we scale them and supply physical credits,
    -- such that we pay off the physical and nominal debts at the same time.
    --
    -- We can bound the worst case physical debt: this is the maximum amount of
    -- steps a merge at this level could need. Note that for levelling this is
    -- includes the single run in the current level.
    maxPhysicalDebt :: Int
maxPhysicalDebt =
      case MergePolicy
mergePolicy of
        MergePolicy
MergePolicyLevelling -> Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int -> Int
tieringRunSize (Int
levelInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
                                  Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int -> Int
levellingRunSize Int
level
        MergePolicy
MergePolicyTiering   -> [Buffer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Buffer]
rs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int -> Int
tieringRunSize (Int
levelInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)

-- | Only based on run count, not their sizes.
tieringLevelIsFull :: Int -> [Run] -> [Run] -> Bool
tieringLevelIsFull :: Int -> [Buffer] -> [Buffer] -> Bool
tieringLevelIsFull Int
_ln [Buffer]
_incoming [Buffer]
resident = [Buffer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Buffer]
resident Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
4

-- | The level is only considered full once the resident run is /too large/ for
-- the level.
levellingLevelIsFull :: Int -> [Run] -> Run -> Bool
levellingLevelIsFull :: Int -> [Buffer] -> Buffer -> Bool
levellingLevelIsFull Int
ln [Buffer]
_incoming Buffer
resident = Buffer -> Int
levellingRunSizeToLevel Buffer
resident Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
ln

-------------------------------------------------------------------------------
-- MergingTree abstraction
--

-- Note [Table Unions]
-- ~~~~~~~~~~~~~~~~~~~
--
-- Semantically, tables are key-value stores like Haskell's @Map@. Table unions
-- then behave like @Map.unionWith (<>)@. If one of the input tables contains
-- a value at a particular key, the result will also contain it. If multiple
-- tables share that key, the values will be combined monoidally (using
-- 'resolveValue' in in this prototype).
--
-- Looking at the implementation, tables are not just key-value pairs, but
-- consist of runs. If each table was just a single run, unioning would involve
-- a run merge similar to the one used for compaction (when a level is full),
-- but with a different merge type 'MergeUnion' that differs semantically: Here,
-- runs don't represent updates (overwriting each other), but they each
-- represent the full state of a table. There is no distinction between no entry
-- and a 'Delete', between an 'Insert' and a 'Mupsert'.
--
-- To union two tables, we can therefore first merge down each table into a
-- single run (using regular level merges) and then union merge these.
--
-- However, we want to spread out the work required and perform these merges
-- incrementally. At first, we only create a new table that is empty except for
-- a data structure 'MergingTree', representing the merges that need to be done.
-- The usual operations can then be performed on the table while the merge is
-- in progress: Inserts go into the table as usual, not affecting its last level
-- ('UnionLevel'), lookups need to consider the tree (requiring some complexity
-- and runtime overhead), further unions incorporate the in-progress tree into
-- the resulting one, which also shares future merging work.
--
-- It seems necessary to represent the suspended merges using a tree. Other
-- approaches don't allow for full sharing of the incremental work (e.g. because
-- they effectively \"re-bracket\" nested unions). It also seems necessary to
-- first merge each input table into a single run, as there is no practical
-- distributive property between level and union merges.

-- | Ensures that the merge contains more than one input, avoiding creating a
-- pending merge where possible.
newPendingLevelMerge :: [IncomingRun s]
                     -> Maybe (MergingTree s)
                     -> ST s (Maybe (MergingTree s))
newPendingLevelMerge :: forall s.
[IncomingRun s]
-> Maybe (MergingTree s) -> ST s (Maybe (MergingTree s))
newPendingLevelMerge [] Maybe (MergingTree s)
t = Maybe (MergingTree s) -> ST s (Maybe (MergingTree s))
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (MergingTree s)
t
newPendingLevelMerge [Single Buffer
r] Maybe (MergingTree s)
Nothing =
    MergingTree s -> Maybe (MergingTree s)
forall a. a -> Maybe a
Just (MergingTree s -> Maybe (MergingTree s))
-> (STRef s (MergingTreeState s) -> MergingTree s)
-> STRef s (MergingTreeState s)
-> Maybe (MergingTree s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STRef s (MergingTreeState s) -> MergingTree s
forall s. STRef s (MergingTreeState s) -> MergingTree s
MergingTree (STRef s (MergingTreeState s) -> Maybe (MergingTree s))
-> ST s (STRef s (MergingTreeState s))
-> ST s (Maybe (MergingTree s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MergingTreeState s -> ST s (STRef s (MergingTreeState s))
forall a s. a -> ST s (STRef s a)
newSTRef (Buffer -> MergingTreeState s
forall s. Buffer -> MergingTreeState s
CompletedTreeMerge Buffer
r)
newPendingLevelMerge [Merging{}] Maybe (MergingTree s)
Nothing =
    -- This case should never occur. If there is a single entry in the list,
    -- there can only be one level in the input table. At level 1 there are no
    -- merging runs, so it must be a PreExistingRun.
    String -> ST s (Maybe (MergingTree s))
forall a. HasCallStack => String -> a
error String
"newPendingLevelMerge: singleton Merging run"
newPendingLevelMerge [IncomingRun s]
irs Maybe (MergingTree s)
tree = do
    let prs :: [PreExistingRun s]
prs = (IncomingRun s -> PreExistingRun s)
-> [IncomingRun s] -> [PreExistingRun s]
forall a b. (a -> b) -> [a] -> [b]
map IncomingRun s -> PreExistingRun s
forall {s}. IncomingRun s -> PreExistingRun s
incomingToPreExistingRun [IncomingRun s]
irs
        st :: MergingTreeState s
st  = PendingMerge s -> MergingTreeState s
forall s. PendingMerge s -> MergingTreeState s
PendingTreeMerge ([PreExistingRun s] -> Maybe (MergingTree s) -> PendingMerge s
forall s.
[PreExistingRun s] -> Maybe (MergingTree s) -> PendingMerge s
PendingLevelMerge [PreExistingRun s]
prs Maybe (MergingTree s)
tree)
    MergingTree s -> Maybe (MergingTree s)
forall a. a -> Maybe a
Just (MergingTree s -> Maybe (MergingTree s))
-> (STRef s (MergingTreeState s) -> MergingTree s)
-> STRef s (MergingTreeState s)
-> Maybe (MergingTree s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STRef s (MergingTreeState s) -> MergingTree s
forall s. STRef s (MergingTreeState s) -> MergingTree s
MergingTree (STRef s (MergingTreeState s) -> Maybe (MergingTree s))
-> ST s (STRef s (MergingTreeState s))
-> ST s (Maybe (MergingTree s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MergingTreeState s -> ST s (STRef s (MergingTreeState s))
forall a s. a -> ST s (STRef s a)
newSTRef MergingTreeState s
st
  where
    incomingToPreExistingRun :: IncomingRun s -> PreExistingRun s
incomingToPreExistingRun (Single         Buffer
r) = Buffer -> PreExistingRun s
forall s. Buffer -> PreExistingRun s
PreExistingRun Buffer
r
    incomingToPreExistingRun (Merging MergePolicy
_ NominalDebt
_ STRef s NominalCredit
_ MergingRun LevelMergeType s
mr) = MergingRun LevelMergeType s -> PreExistingRun s
forall s. MergingRun LevelMergeType s -> PreExistingRun s
PreExistingMergingRun MergingRun LevelMergeType s
mr

-- | Ensures that the merge contains more than one input.
newPendingUnionMerge :: [MergingTree s] -> ST s (Maybe (MergingTree s))
newPendingUnionMerge :: forall s. [MergingTree s] -> ST s (Maybe (MergingTree s))
newPendingUnionMerge []  = Maybe (MergingTree s) -> ST s (Maybe (MergingTree s))
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (MergingTree s)
forall a. Maybe a
Nothing
newPendingUnionMerge [MergingTree s
t] = Maybe (MergingTree s) -> ST s (Maybe (MergingTree s))
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergingTree s -> Maybe (MergingTree s)
forall a. a -> Maybe a
Just MergingTree s
t)
newPendingUnionMerge [MergingTree s]
trees = do
    let st :: MergingTreeState s
st = PendingMerge s -> MergingTreeState s
forall s. PendingMerge s -> MergingTreeState s
PendingTreeMerge ([MergingTree s] -> PendingMerge s
forall s. [MergingTree s] -> PendingMerge s
PendingUnionMerge [MergingTree s]
trees)
    MergingTree s -> Maybe (MergingTree s)
forall a. a -> Maybe a
Just (MergingTree s -> Maybe (MergingTree s))
-> (STRef s (MergingTreeState s) -> MergingTree s)
-> STRef s (MergingTreeState s)
-> Maybe (MergingTree s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STRef s (MergingTreeState s) -> MergingTree s
forall s. STRef s (MergingTreeState s) -> MergingTree s
MergingTree (STRef s (MergingTreeState s) -> Maybe (MergingTree s))
-> ST s (STRef s (MergingTreeState s))
-> ST s (Maybe (MergingTree s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MergingTreeState s -> ST s (STRef s (MergingTreeState s))
forall a s. a -> ST s (STRef s a)
newSTRef MergingTreeState s
st

contentToMergingTree :: LSMContent s -> ST s (Maybe (MergingTree s))
contentToMergingTree :: forall s. LSMContent s -> ST s (Maybe (MergingTree s))
contentToMergingTree (LSMContent Buffer
wb Levels s
ls UnionLevel s
ul) =
    [IncomingRun s]
-> Maybe (MergingTree s) -> ST s (Maybe (MergingTree s))
forall s.
[IncomingRun s]
-> Maybe (MergingTree s) -> ST s (Maybe (MergingTree s))
newPendingLevelMerge ([IncomingRun s]
buffers [IncomingRun s] -> [IncomingRun s] -> [IncomingRun s]
forall a. [a] -> [a] -> [a]
++ [IncomingRun s]
levels) Maybe (MergingTree s)
trees
  where
    -- flush the write buffer (but this should not modify the content)
    buffers :: [IncomingRun s]
buffers
      | Buffer -> Int
bufferSize Buffer
wb Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = []
      | Bool
otherwise          = [Buffer -> IncomingRun s
forall s. Buffer -> IncomingRun s
Single (Buffer -> Buffer
bufferToRun Buffer
wb)]

    levels :: [IncomingRun s]
levels = ((Level s -> [IncomingRun s]) -> Levels s -> [IncomingRun s])
-> Levels s -> (Level s -> [IncomingRun s]) -> [IncomingRun s]
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Level s -> [IncomingRun s]) -> Levels s -> [IncomingRun s]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap Levels s
ls ((Level s -> [IncomingRun s]) -> [IncomingRun s])
-> (Level s -> [IncomingRun s]) -> [IncomingRun s]
forall a b. (a -> b) -> a -> b
$ \(Level IncomingRun s
ir [Buffer]
rs) -> IncomingRun s
ir IncomingRun s -> [IncomingRun s] -> [IncomingRun s]
forall a. a -> [a] -> [a]
: (Buffer -> IncomingRun s) -> [Buffer] -> [IncomingRun s]
forall a b. (a -> b) -> [a] -> [b]
map Buffer -> IncomingRun s
forall s. Buffer -> IncomingRun s
Single [Buffer]
rs

    trees :: Maybe (MergingTree s)
trees = case UnionLevel s
ul of
        UnionLevel s
NoUnion   -> Maybe (MergingTree s)
forall a. Maybe a
Nothing
        Union MergingTree s
t STRef s Int
_ -> MergingTree s -> Maybe (MergingTree s)
forall a. a -> Maybe a
Just MergingTree s
t

-- | When calculating (an upped bound of) the total debt of a recursive tree of
-- merges, we also need to return an upper bound on the size of the resulting
-- run. See 'remainingDebtPendingMerge'.
type Size = Int

remainingDebtMergingTree :: MergingTree s -> ST s (Debt, Size)
remainingDebtMergingTree :: forall s. MergingTree s -> ST s (Int, Int)
remainingDebtMergingTree (MergingTree STRef s (MergingTreeState s)
ref) =
    STRef s (MergingTreeState s) -> ST s (MergingTreeState s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MergingTreeState s)
ref ST s (MergingTreeState s)
-> (MergingTreeState s -> ST s (Int, Int)) -> ST s (Int, Int)
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      CompletedTreeMerge Buffer
r -> (Int, Int) -> ST s (Int, Int)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, Buffer -> Int
runSize Buffer
r)
      OngoingTreeMerge MergingRun TreeMergeType s
mr  -> (Int, Int) -> (Int, Int)
forall {a} {b}. Num a => (a, b) -> (a, b)
addDebtOne ((Int, Int) -> (Int, Int)) -> ST s (Int, Int) -> ST s (Int, Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MergingRun TreeMergeType s -> ST s (Int, Int)
forall t s. MergingRun t s -> ST s (Int, Int)
remainingDebtMergingRun MergingRun TreeMergeType s
mr
      PendingTreeMerge PendingMerge s
pm  -> (Int, Int) -> (Int, Int)
forall {a} {b}. Num a => (a, b) -> (a, b)
addDebtOne ((Int, Int) -> (Int, Int)) -> ST s (Int, Int) -> ST s (Int, Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> PendingMerge s -> ST s (Int, Int)
forall s. PendingMerge s -> ST s (Int, Int)
remainingDebtPendingMerge PendingMerge s
pm
  where
    -- An ongoing merge should never have 0 debt, even if the 'MergingRun' in it
    -- says it is completed. We still need to update it to 'CompletedTreeMerge'.
    -- Similarly, a pending merge needs some work to complete it, even if all
    -- its inputs are empty.
    --
    -- Note that we can't use @max 1@, as this would violate the property that
    -- supplying N credits reduces the remaining debt by at least N.
    addDebtOne :: (a, b) -> (a, b)
addDebtOne (a
debt, b
size) = (a
debt a -> a -> a
forall a. Num a => a -> a -> a
+ a
1, b
size)

remainingDebtPendingMerge :: PendingMerge s -> ST s (Debt, Size)
remainingDebtPendingMerge :: forall s. PendingMerge s -> ST s (Int, Int)
remainingDebtPendingMerge (PendingMerge TreeMergeType
_ [PreExistingRun s]
prs [MergingTree s]
trees) = do
    ([Int]
debts, [Int]
sizes) <- [(Int, Int)] -> ([Int], [Int])
forall a b. [(a, b)] -> ([a], [b])
unzip ([(Int, Int)] -> ([Int], [Int]))
-> ([[(Int, Int)]] -> [(Int, Int)])
-> [[(Int, Int)]]
-> ([Int], [Int])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [[(Int, Int)]] -> [(Int, Int)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[(Int, Int)]] -> ([Int], [Int]))
-> ST s [[(Int, Int)]] -> ST s ([Int], [Int])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ST s [(Int, Int)]] -> ST s [[(Int, Int)]]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence
        [ (PreExistingRun s -> ST s (Int, Int))
-> [PreExistingRun s] -> ST s [(Int, Int)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse PreExistingRun s -> ST s (Int, Int)
forall {s}. PreExistingRun s -> ST s (Int, Int)
remainingDebtPreExistingRun [PreExistingRun s]
prs
        , (MergingTree s -> ST s (Int, Int))
-> [MergingTree s] -> ST s [(Int, Int)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse MergingTree s -> ST s (Int, Int)
forall s. MergingTree s -> ST s (Int, Int)
remainingDebtMergingTree [MergingTree s]
trees
        ]
    let totalSize :: Int
totalSize = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum [Int]
sizes
    let totalDebt :: Int
totalDebt = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum [Int]
debts Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
totalSize
    (Int, Int) -> ST s (Int, Int)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
totalDebt, Int
totalSize)
  where
    remainingDebtPreExistingRun :: PreExistingRun s -> ST s (Int, Int)
remainingDebtPreExistingRun = \case
        PreExistingRun         Buffer
r -> (Int, Int) -> ST s (Int, Int)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, Buffer -> Int
runSize Buffer
r)
        PreExistingMergingRun MergingRun LevelMergeType s
mr -> MergingRun LevelMergeType s -> ST s (Int, Int)
forall t s. MergingRun t s -> ST s (Int, Int)
remainingDebtMergingRun MergingRun LevelMergeType s
mr

remainingDebtMergingRun :: MergingRun t s -> ST s (Debt, Size)
remainingDebtMergingRun :: forall t s. MergingRun t s -> ST s (Int, Int)
remainingDebtMergingRun (MergingRun t
_ MergeDebt
d STRef s MergingRunState
ref) =
    STRef s MergingRunState -> ST s MergingRunState
forall s a. STRef s a -> ST s a
readSTRef STRef s MergingRunState
ref ST s MergingRunState
-> (MergingRunState -> ST s (Int, Int)) -> ST s (Int, Int)
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      CompletedMerge Buffer
r ->
        (Int, Int) -> ST s (Int, Int)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, Buffer -> Int
runSize Buffer
r)
      OngoingMerge MergeCredit
c [Buffer]
inputRuns Buffer
_ ->
        (Int, Int) -> ST s (Int, Int)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (HasCallStack => MergeDebt -> MergeCredit -> Int
MergeDebt -> MergeCredit -> Int
mergeDebtLeft MergeDebt
d MergeCredit
c, [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ((Buffer -> Int) -> [Buffer] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Buffer -> Int
runSize [Buffer]
inputRuns))

-- | For each of the @supplyCredits@ type functions, we want to check some
-- common properties.
checked :: HasCallStack
        => (a -> ST s (Debt, Size))  -- ^ how to calculate the current debt
        -> (Credit -> a -> ST s Credit)  -- ^ how to supply the credits
        -> Credit -> a -> ST s Credit
checked :: forall a s.
HasCallStack =>
(a -> ST s (Int, Int))
-> (Int -> a -> ST s Int) -> Int -> a -> ST s Int
checked a -> ST s (Int, Int)
query Int -> a -> ST s Int
supply Int
credits a
x = do
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Int
credits Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0   -- only call them when there are credits to spend
    Int
debt <- (Int, Int) -> Int
forall a b. (a, b) -> a
fst ((Int, Int) -> Int) -> ST s (Int, Int) -> ST s Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> ST s (Int, Int)
query a
x
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Int
debt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0     -- debt can't be negative
    Int
c' <- Int -> a -> ST s Int
supply Int
credits a
x
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Int
c' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
credits -- can't have more leftovers than we started with
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Int
c' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0       -- leftovers can't be negative
    Int
debt' <- (Int, Int) -> Int
forall a b. (a, b) -> a
fst ((Int, Int) -> Int) -> ST s (Int, Int) -> ST s Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> ST s (Int, Int)
query a
x
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Int
debt' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0
    -- the debt was reduced sufficiently (amount of credits spent)
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Bool -> ST s ()) -> Bool -> ST s ()
forall a b. (a -> b) -> a -> b
$ Int
debt' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
debt Int -> Int -> Int
forall a. Num a => a -> a -> a
- (Int
credits Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
c')
    Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
c'

supplyCreditsMergingTree :: Credit -> MergingTree s -> ST s Credit
supplyCreditsMergingTree :: forall s. Int -> MergingTree s -> ST s Int
supplyCreditsMergingTree = (MergingTree s -> ST s (Int, Int))
-> (Int -> MergingTree s -> ST s Int)
-> Int
-> MergingTree s
-> ST s Int
forall a s.
HasCallStack =>
(a -> ST s (Int, Int))
-> (Int -> a -> ST s Int) -> Int -> a -> ST s Int
checked MergingTree s -> ST s (Int, Int)
forall s. MergingTree s -> ST s (Int, Int)
remainingDebtMergingTree ((Int -> MergingTree s -> ST s Int)
 -> Int -> MergingTree s -> ST s Int)
-> (Int -> MergingTree s -> ST s Int)
-> Int
-> MergingTree s
-> ST s Int
forall a b. (a -> b) -> a -> b
$ \Int
credits (MergingTree STRef s (MergingTreeState s)
ref) -> do
    MergingTreeState s
treeState <- STRef s (MergingTreeState s) -> ST s (MergingTreeState s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MergingTreeState s)
ref
    (!Int
c', !MergingTreeState s
treeState') <- Int -> MergingTreeState s -> ST s (Int, MergingTreeState s)
forall s.
Int -> MergingTreeState s -> ST s (Int, MergingTreeState s)
supplyCreditsMergingTreeState Int
credits MergingTreeState s
treeState
    STRef s (MergingTreeState s) -> MergingTreeState s -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (MergingTreeState s)
ref MergingTreeState s
treeState'
    Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
c'

supplyCreditsMergingTreeState :: Credit -> MergingTreeState s
                              -> ST s (Credit, MergingTreeState s)
supplyCreditsMergingTreeState :: forall s.
Int -> MergingTreeState s -> ST s (Int, MergingTreeState s)
supplyCreditsMergingTreeState Int
credits !MergingTreeState s
state = do
    Bool -> ST s ()
forall s. HasCallStack => Bool -> ST s ()
assertST (Int
credits Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0)
    case MergingTreeState s
state of
      CompletedTreeMerge{} ->
        (Int, MergingTreeState s) -> ST s (Int, MergingTreeState s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
credits, MergingTreeState s
state)
      OngoingTreeMerge MergingRun TreeMergeType s
mr -> do
        Int
c' <- Int -> MergingRun TreeMergeType s -> ST s Int
forall t s. Int -> MergingRun t s -> ST s Int
supplyCreditsMergingRun Int
credits MergingRun TreeMergeType s
mr
        if Int
c' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
          then (Int, MergingTreeState s) -> ST s (Int, MergingTreeState s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, MergingTreeState s
state)
          else do
            Buffer
r <- MergingRun TreeMergeType s -> ST s Buffer
forall t s. HasCallStack => MergingRun t s -> ST s Buffer
expectCompletedMergingRun MergingRun TreeMergeType s
mr
            -- all work is done, we can't spend any more credits
            (Int, MergingTreeState s) -> ST s (Int, MergingTreeState s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
c', Buffer -> MergingTreeState s
forall s. Buffer -> MergingTreeState s
CompletedTreeMerge Buffer
r)
      PendingTreeMerge PendingMerge s
pm -> do
        Int
c' <- Int -> PendingMerge s -> ST s Int
forall s. Int -> PendingMerge s -> ST s Int
supplyCreditsPendingMerge Int
credits PendingMerge s
pm
        if Int
c' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
          then
            -- still remaining work in children, we can't do more for now
            (Int, MergingTreeState s) -> ST s (Int, MergingTreeState s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
c', MergingTreeState s
state)
          else do
            -- all children must be done, create new merge!
            (TreeMergeType
mergeType, [Buffer]
rs) <- PendingMerge s -> ST s (TreeMergeType, [Buffer])
forall s.
HasCallStack =>
PendingMerge s -> ST s (TreeMergeType, [Buffer])
expectCompletedChildren PendingMerge s
pm
            case [Buffer]
rs of
              [Buffer
r] -> (Int, MergingTreeState s) -> ST s (Int, MergingTreeState s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
c', Buffer -> MergingTreeState s
forall s. Buffer -> MergingTreeState s
CompletedTreeMerge Buffer
r)
              [Buffer]
_   -> do
                MergingTreeState s
state' <- MergingRun TreeMergeType s -> MergingTreeState s
forall s. MergingRun TreeMergeType s -> MergingTreeState s
OngoingTreeMerge (MergingRun TreeMergeType s -> MergingTreeState s)
-> ST s (MergingRun TreeMergeType s) -> ST s (MergingTreeState s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TreeMergeType -> [Buffer] -> ST s (MergingRun TreeMergeType s)
forall t s. IsMergeType t => t -> [Buffer] -> ST s (MergingRun t s)
newMergingRun TreeMergeType
mergeType [Buffer]
rs
                -- use any remaining credits to progress the new merge
                Int -> MergingTreeState s -> ST s (Int, MergingTreeState s)
forall s.
Int -> MergingTreeState s -> ST s (Int, MergingTreeState s)
supplyCreditsMergingTreeState Int
c' MergingTreeState s
state'

supplyCreditsPendingMerge :: Credit -> PendingMerge s -> ST s Credit
supplyCreditsPendingMerge :: forall s. Int -> PendingMerge s -> ST s Int
supplyCreditsPendingMerge = (PendingMerge s -> ST s (Int, Int))
-> (Int -> PendingMerge s -> ST s Int)
-> Int
-> PendingMerge s
-> ST s Int
forall a s.
HasCallStack =>
(a -> ST s (Int, Int))
-> (Int -> a -> ST s Int) -> Int -> a -> ST s Int
checked PendingMerge s -> ST s (Int, Int)
forall s. PendingMerge s -> ST s (Int, Int)
remainingDebtPendingMerge ((Int -> PendingMerge s -> ST s Int)
 -> Int -> PendingMerge s -> ST s Int)
-> (Int -> PendingMerge s -> ST s Int)
-> Int
-> PendingMerge s
-> ST s Int
forall a b. (a -> b) -> a -> b
$ \Int
credits -> \case
    PendingLevelMerge [PreExistingRun s]
prs Maybe (MergingTree s)
tree ->
      (Int -> PreExistingRun s -> ST s Int)
-> [PreExistingRun s] -> Int -> ST s Int
forall a s. (Int -> a -> ST s Int) -> [a] -> Int -> ST s Int
leftToRight Int -> PreExistingRun s -> ST s Int
forall {s}. Int -> PreExistingRun s -> ST s Int
supplyPreExistingRun [PreExistingRun s]
prs Int
credits
        ST s Int -> (Int -> ST s Int) -> ST s Int
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Int -> MergingTree s -> ST s Int)
-> [MergingTree s] -> Int -> ST s Int
forall a s. (Int -> a -> ST s Int) -> [a] -> Int -> ST s Int
leftToRight Int -> MergingTree s -> ST s Int
forall s. Int -> MergingTree s -> ST s Int
supplyCreditsMergingTree (Maybe (MergingTree s) -> [MergingTree s]
forall a. Maybe a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Maybe (MergingTree s)
tree)
    PendingUnionMerge [MergingTree s]
trees ->
      (Int -> MergingTree s -> ST s Int)
-> [MergingTree s] -> Int -> ST s Int
forall a s. (Int -> a -> ST s Int) -> [a] -> Int -> ST s Int
splitEqually Int -> MergingTree s -> ST s Int
forall s. Int -> MergingTree s -> ST s Int
supplyCreditsMergingTree [MergingTree s]
trees Int
credits
  where
    supplyPreExistingRun :: Int -> PreExistingRun s -> ST s Int
supplyPreExistingRun Int
c = \case
        PreExistingRun        Buffer
_r -> Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
c
        PreExistingMergingRun MergingRun LevelMergeType s
mr -> Int -> MergingRun LevelMergeType s -> ST s Int
forall t s. Int -> MergingRun t s -> ST s Int
supplyCreditsMergingRun Int
c MergingRun LevelMergeType s
mr

    -- supply credits left to right until they are used up
    leftToRight :: (Credit -> a -> ST s Credit) -> [a] -> Credit -> ST s Credit
    leftToRight :: forall a s. (Int -> a -> ST s Int) -> [a] -> Int -> ST s Int
leftToRight Int -> a -> ST s Int
_ [a]
_      Int
0 = Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0
    leftToRight Int -> a -> ST s Int
_ []     Int
c = Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
c
    leftToRight Int -> a -> ST s Int
f (a
x:[a]
xs) Int
c = Int -> a -> ST s Int
f Int
c a
x ST s Int -> (Int -> ST s Int) -> ST s Int
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Int -> a -> ST s Int) -> [a] -> Int -> ST s Int
forall a s. (Int -> a -> ST s Int) -> [a] -> Int -> ST s Int
leftToRight Int -> a -> ST s Int
f [a]
xs

    -- approximately equal, being more precise would require more iterations
    splitEqually :: (Credit -> a -> ST s Credit) -> [a] -> Credit -> ST s Credit
    splitEqually :: forall a s. (Int -> a -> ST s Int) -> [a] -> Int -> ST s Int
splitEqually Int -> a -> ST s Int
f [a]
xs Int
credits =
        -- first give each tree k = ceil(1/n) credits (last ones might get less).
        -- it's important we fold here to collect leftovers.
        -- any remainders go left to right.
        (Int -> a -> ST s Int) -> Int -> [a] -> ST s Int
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM Int -> a -> ST s Int
supply Int
credits [a]
xs ST s Int -> (Int -> ST s Int) -> ST s Int
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Int -> a -> ST s Int) -> [a] -> Int -> ST s Int
forall a s. (Int -> a -> ST s Int) -> [a] -> Int -> ST s Int
leftToRight Int -> a -> ST s Int
f [a]
xs
      where
        !n :: Int
n = [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
xs
        !k :: Int
k = (Int
credits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
n

        supply :: Int -> a -> ST s Int
supply Int
0 a
_ = Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0
        supply Int
c a
t = do
            let creditsToSpend :: Int
creditsToSpend = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
k Int
c
            Int
leftovers <- Int -> a -> ST s Int
f Int
creditsToSpend a
t
            Int -> ST s Int
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
c Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
creditsToSpend Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
leftovers)

expectCompletedChildren :: HasCallStack
                        => PendingMerge s -> ST s (TreeMergeType, [Run])
expectCompletedChildren :: forall s.
HasCallStack =>
PendingMerge s -> ST s (TreeMergeType, [Buffer])
expectCompletedChildren (PendingMerge TreeMergeType
mt [PreExistingRun s]
prs [MergingTree s]
trees) = do
    [Buffer]
rs1 <- (PreExistingRun s -> ST s Buffer)
-> [PreExistingRun s] -> ST s [Buffer]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse PreExistingRun s -> ST s Buffer
forall {s}. PreExistingRun s -> ST s Buffer
expectCompletedPreExistingRun [PreExistingRun s]
prs
    [Buffer]
rs2 <- (MergingTree s -> ST s Buffer) -> [MergingTree s] -> ST s [Buffer]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse MergingTree s -> ST s Buffer
forall s. HasCallStack => MergingTree s -> ST s Buffer
expectCompletedMergingTree [MergingTree s]
trees
    (TreeMergeType, [Buffer]) -> ST s (TreeMergeType, [Buffer])
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (TreeMergeType
mt, [Buffer]
rs1 [Buffer] -> [Buffer] -> [Buffer]
forall a. [a] -> [a] -> [a]
++ [Buffer]
rs2)
  where
    expectCompletedPreExistingRun :: PreExistingRun s -> ST s Buffer
expectCompletedPreExistingRun = \case
        PreExistingRun         Buffer
r -> Buffer -> ST s Buffer
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Buffer
r
        PreExistingMergingRun MergingRun LevelMergeType s
mr -> MergingRun LevelMergeType s -> ST s Buffer
forall t s. HasCallStack => MergingRun t s -> ST s Buffer
expectCompletedMergingRun MergingRun LevelMergeType s
mr

expectCompletedMergingTree :: HasCallStack => MergingTree s -> ST s Run
expectCompletedMergingTree :: forall s. HasCallStack => MergingTree s -> ST s Buffer
expectCompletedMergingTree = Invariant s Buffer -> ST s Buffer
forall s a. HasCallStack => Invariant s a -> ST s a
expectInvariant (Invariant s Buffer -> ST s Buffer)
-> (MergingTree s -> Invariant s Buffer)
-> MergingTree s
-> ST s Buffer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MergingTree s -> Invariant s Buffer
forall s. MergingTree s -> Invariant s Buffer
isCompletedMergingTree

-------------------------------------------------------------------------------
-- Measurements
--

data MTree r = MLeaf r
             | MNode TreeMergeType [MTree r]
  deriving stock (MTree r -> MTree r -> Bool
(MTree r -> MTree r -> Bool)
-> (MTree r -> MTree r -> Bool) -> Eq (MTree r)
forall r. Eq r => MTree r -> MTree r -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall r. Eq r => MTree r -> MTree r -> Bool
== :: MTree r -> MTree r -> Bool
$c/= :: forall r. Eq r => MTree r -> MTree r -> Bool
/= :: MTree r -> MTree r -> Bool
Eq, (forall m. Monoid m => MTree m -> m)
-> (forall m a. Monoid m => (a -> m) -> MTree a -> m)
-> (forall m a. Monoid m => (a -> m) -> MTree a -> m)
-> (forall a b. (a -> b -> b) -> b -> MTree a -> b)
-> (forall a b. (a -> b -> b) -> b -> MTree a -> b)
-> (forall b a. (b -> a -> b) -> b -> MTree a -> b)
-> (forall b a. (b -> a -> b) -> b -> MTree a -> b)
-> (forall a. (a -> a -> a) -> MTree a -> a)
-> (forall a. (a -> a -> a) -> MTree a -> a)
-> (forall a. MTree a -> [a])
-> (forall a. MTree a -> Bool)
-> (forall a. MTree a -> Int)
-> (forall a. Eq a => a -> MTree a -> Bool)
-> (forall a. Ord a => MTree a -> a)
-> (forall a. Ord a => MTree a -> a)
-> (forall a. Num a => MTree a -> a)
-> (forall a. Num a => MTree a -> a)
-> Foldable MTree
forall a. Eq a => a -> MTree a -> Bool
forall a. Num a => MTree a -> a
forall a. Ord a => MTree a -> a
forall m. Monoid m => MTree m -> m
forall a. MTree a -> Bool
forall a. MTree a -> Int
forall a. MTree a -> [a]
forall a. (a -> a -> a) -> MTree a -> a
forall m a. Monoid m => (a -> m) -> MTree a -> m
forall b a. (b -> a -> b) -> b -> MTree a -> b
forall a b. (a -> b -> b) -> b -> MTree a -> b
forall (t :: * -> *).
(forall m. Monoid m => t m -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. t a -> [a])
-> (forall a. t a -> Bool)
-> (forall a. t a -> Int)
-> (forall a. Eq a => a -> t a -> Bool)
-> (forall a. Ord a => t a -> a)
-> (forall a. Ord a => t a -> a)
-> (forall a. Num a => t a -> a)
-> (forall a. Num a => t a -> a)
-> Foldable t
$cfold :: forall m. Monoid m => MTree m -> m
fold :: forall m. Monoid m => MTree m -> m
$cfoldMap :: forall m a. Monoid m => (a -> m) -> MTree a -> m
foldMap :: forall m a. Monoid m => (a -> m) -> MTree a -> m
$cfoldMap' :: forall m a. Monoid m => (a -> m) -> MTree a -> m
foldMap' :: forall m a. Monoid m => (a -> m) -> MTree a -> m
$cfoldr :: forall a b. (a -> b -> b) -> b -> MTree a -> b
foldr :: forall a b. (a -> b -> b) -> b -> MTree a -> b
$cfoldr' :: forall a b. (a -> b -> b) -> b -> MTree a -> b
foldr' :: forall a b. (a -> b -> b) -> b -> MTree a -> b
$cfoldl :: forall b a. (b -> a -> b) -> b -> MTree a -> b
foldl :: forall b a. (b -> a -> b) -> b -> MTree a -> b
$cfoldl' :: forall b a. (b -> a -> b) -> b -> MTree a -> b
foldl' :: forall b a. (b -> a -> b) -> b -> MTree a -> b
$cfoldr1 :: forall a. (a -> a -> a) -> MTree a -> a
foldr1 :: forall a. (a -> a -> a) -> MTree a -> a
$cfoldl1 :: forall a. (a -> a -> a) -> MTree a -> a
foldl1 :: forall a. (a -> a -> a) -> MTree a -> a
$ctoList :: forall a. MTree a -> [a]
toList :: forall a. MTree a -> [a]
$cnull :: forall a. MTree a -> Bool
null :: forall a. MTree a -> Bool
$clength :: forall a. MTree a -> Int
length :: forall a. MTree a -> Int
$celem :: forall a. Eq a => a -> MTree a -> Bool
elem :: forall a. Eq a => a -> MTree a -> Bool
$cmaximum :: forall a. Ord a => MTree a -> a
maximum :: forall a. Ord a => MTree a -> a
$cminimum :: forall a. Ord a => MTree a -> a
minimum :: forall a. Ord a => MTree a -> a
$csum :: forall a. Num a => MTree a -> a
sum :: forall a. Num a => MTree a -> a
$cproduct :: forall a. Num a => MTree a -> a
product :: forall a. Num a => MTree a -> a
Foldable, (forall a b. (a -> b) -> MTree a -> MTree b)
-> (forall a b. a -> MTree b -> MTree a) -> Functor MTree
forall a b. a -> MTree b -> MTree a
forall a b. (a -> b) -> MTree a -> MTree b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> MTree a -> MTree b
fmap :: forall a b. (a -> b) -> MTree a -> MTree b
$c<$ :: forall a b. a -> MTree b -> MTree a
<$ :: forall a b. a -> MTree b -> MTree a
Functor, Int -> MTree r -> ShowS
[MTree r] -> ShowS
MTree r -> String
(Int -> MTree r -> ShowS)
-> (MTree r -> String) -> ([MTree r] -> ShowS) -> Show (MTree r)
forall r. Show r => Int -> MTree r -> ShowS
forall r. Show r => [MTree r] -> ShowS
forall r. Show r => MTree r -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall r. Show r => Int -> MTree r -> ShowS
showsPrec :: Int -> MTree r -> ShowS
$cshow :: forall r. Show r => MTree r -> String
show :: MTree r -> String
$cshowList :: forall r. Show r => [MTree r] -> ShowS
showList :: [MTree r] -> ShowS
Show)

allLevels :: LSM s -> ST s (Buffer, [[Run]], Maybe (MTree Run))
allLevels :: forall s. LSM s -> ST s (Buffer, [[Buffer]], Maybe (MTree Buffer))
allLevels (LSMHandle STRef s Int
_ STRef s (LSMContent s)
lsmr) = do
    LSMContent Buffer
wb Levels s
ls UnionLevel s
ul <- STRef s (LSMContent s) -> ST s (LSMContent s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (LSMContent s)
lsmr
    [[Buffer]]
rs <- Levels s -> ST s [[Buffer]]
forall s. Levels s -> ST s [[Buffer]]
flattenLevels Levels s
ls
    Maybe (MTree Buffer)
tree <- case UnionLevel s
ul of
      UnionLevel s
NoUnion   -> Maybe (MTree Buffer) -> ST s (Maybe (MTree Buffer))
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (MTree Buffer)
forall a. Maybe a
Nothing
      Union MergingTree s
t STRef s Int
_ -> MTree Buffer -> Maybe (MTree Buffer)
forall a. a -> Maybe a
Just (MTree Buffer -> Maybe (MTree Buffer))
-> ST s (MTree Buffer) -> ST s (Maybe (MTree Buffer))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MergingTree s -> ST s (MTree Buffer)
forall s. MergingTree s -> ST s (MTree Buffer)
flattenTree MergingTree s
t
    (Buffer, [[Buffer]], Maybe (MTree Buffer))
-> ST s (Buffer, [[Buffer]], Maybe (MTree Buffer))
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer
wb, [[Buffer]]
rs, Maybe (MTree Buffer)
tree)

flattenLevels :: Levels s -> ST s [[Run]]
flattenLevels :: forall s. Levels s -> ST s [[Buffer]]
flattenLevels = (Level s -> ST s [Buffer]) -> [Level s] -> ST s [[Buffer]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM Level s -> ST s [Buffer]
forall s. Level s -> ST s [Buffer]
flattenLevel

flattenLevel :: Level s -> ST s [Run]
flattenLevel :: forall s. Level s -> ST s [Buffer]
flattenLevel (Level IncomingRun s
ir [Buffer]
rs) = ([Buffer] -> [Buffer] -> [Buffer]
forall a. [a] -> [a] -> [a]
++ [Buffer]
rs) ([Buffer] -> [Buffer]) -> ST s [Buffer] -> ST s [Buffer]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IncomingRun s -> ST s [Buffer]
forall s. IncomingRun s -> ST s [Buffer]
flattenIncomingRun IncomingRun s
ir

flattenIncomingRun :: IncomingRun s -> ST s [Run]
flattenIncomingRun :: forall s. IncomingRun s -> ST s [Buffer]
flattenIncomingRun = \case
    Single Buffer
r         -> [Buffer] -> ST s [Buffer]
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return [Buffer
r]
    Merging MergePolicy
_ NominalDebt
_ STRef s NominalCredit
_ MergingRun LevelMergeType s
mr -> MergingRun LevelMergeType s -> ST s [Buffer]
forall t s. MergingRun t s -> ST s [Buffer]
flattenMergingRun MergingRun LevelMergeType s
mr

flattenMergingRun :: MergingRun t s -> ST s [Run]
flattenMergingRun :: forall t s. MergingRun t s -> ST s [Buffer]
flattenMergingRun (MergingRun t
_ MergeDebt
_ STRef s MergingRunState
ref) = do
    MergingRunState
mrs <- STRef s MergingRunState -> ST s MergingRunState
forall s a. STRef s a -> ST s a
readSTRef STRef s MergingRunState
ref
    case MergingRunState
mrs of
      CompletedMerge Buffer
r    -> [Buffer] -> ST s [Buffer]
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return [Buffer
r]
      OngoingMerge MergeCredit
_ [Buffer]
rs Buffer
_ -> [Buffer] -> ST s [Buffer]
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return [Buffer]
rs

flattenTree :: MergingTree s -> ST s (MTree Run)
flattenTree :: forall s. MergingTree s -> ST s (MTree Buffer)
flattenTree (MergingTree STRef s (MergingTreeState s)
ref) = do
    MergingTreeState s
mts <- STRef s (MergingTreeState s) -> ST s (MergingTreeState s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (MergingTreeState s)
ref
    case MergingTreeState s
mts of
      CompletedTreeMerge Buffer
r ->
        MTree Buffer -> ST s (MTree Buffer)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer -> MTree Buffer
forall r. r -> MTree r
MLeaf Buffer
r)
      OngoingTreeMerge (MergingRun TreeMergeType
mt MergeDebt
_ STRef s MergingRunState
mrs) ->
        STRef s MergingRunState -> ST s MergingRunState
forall s a. STRef s a -> ST s a
readSTRef STRef s MergingRunState
mrs ST s MergingRunState
-> (MergingRunState -> ST s (MTree Buffer)) -> ST s (MTree Buffer)
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          CompletedMerge Buffer
r    -> MTree Buffer -> ST s (MTree Buffer)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer -> MTree Buffer
forall r. r -> MTree r
MLeaf Buffer
r)
          OngoingMerge MergeCredit
_ [Buffer]
rs Buffer
_ -> MTree Buffer -> ST s (MTree Buffer)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (TreeMergeType -> [MTree Buffer] -> MTree Buffer
forall r. TreeMergeType -> [MTree r] -> MTree r
MNode TreeMergeType
mt (Buffer -> MTree Buffer
forall r. r -> MTree r
MLeaf (Buffer -> MTree Buffer) -> [Buffer] -> [MTree Buffer]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Buffer]
rs))
      PendingTreeMerge (PendingMerge TreeMergeType
mt [PreExistingRun s]
irs [MergingTree s]
trees) -> do
        [MTree Buffer]
irs' <- (Buffer -> MTree Buffer) -> [Buffer] -> [MTree Buffer]
forall a b. (a -> b) -> [a] -> [b]
map Buffer -> MTree Buffer
forall r. r -> MTree r
MLeaf ([Buffer] -> [MTree Buffer])
-> ([[Buffer]] -> [Buffer]) -> [[Buffer]] -> [MTree Buffer]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [[Buffer]] -> [Buffer]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[Buffer]] -> [MTree Buffer])
-> ST s [[Buffer]] -> ST s [MTree Buffer]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (PreExistingRun s -> ST s [Buffer])
-> [PreExistingRun s] -> ST s [[Buffer]]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse PreExistingRun s -> ST s [Buffer]
forall s. PreExistingRun s -> ST s [Buffer]
flattenPreExistingRun [PreExistingRun s]
irs
        [MTree Buffer]
trees' <- (MergingTree s -> ST s (MTree Buffer))
-> [MergingTree s] -> ST s [MTree Buffer]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse MergingTree s -> ST s (MTree Buffer)
forall s. MergingTree s -> ST s (MTree Buffer)
flattenTree [MergingTree s]
trees
        MTree Buffer -> ST s (MTree Buffer)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (TreeMergeType -> [MTree Buffer] -> MTree Buffer
forall r. TreeMergeType -> [MTree r] -> MTree r
MNode TreeMergeType
mt ([MTree Buffer]
irs' [MTree Buffer] -> [MTree Buffer] -> [MTree Buffer]
forall a. [a] -> [a] -> [a]
++ [MTree Buffer]
trees'))

flattenPreExistingRun :: PreExistingRun s -> ST s [Run]
flattenPreExistingRun :: forall s. PreExistingRun s -> ST s [Buffer]
flattenPreExistingRun = \case
    PreExistingRun         Buffer
r -> [Buffer] -> ST s [Buffer]
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return [Buffer
r]
    PreExistingMergingRun MergingRun LevelMergeType s
mr -> MergingRun LevelMergeType s -> ST s [Buffer]
forall t s. MergingRun t s -> ST s [Buffer]
flattenMergingRun MergingRun LevelMergeType s
mr

logicalValue :: LSM s -> ST s (Map Key (Value, Maybe Blob))
logicalValue :: forall s. LSM s -> ST s (Map Key (Value, Maybe Blob))
logicalValue LSM s
lsm = do
    (Buffer
wb, [[Buffer]]
levels, Maybe (MTree Buffer)
tree) <- LSM s -> ST s (Buffer, [[Buffer]], Maybe (MTree Buffer))
forall s. LSM s -> ST s (Buffer, [[Buffer]], Maybe (MTree Buffer))
allLevels LSM s
lsm
    let r :: Buffer
r = TreeMergeType -> [Buffer] -> Buffer
forall t. IsMergeType t => t -> [Buffer] -> Buffer
mergek
              TreeMergeType
MergeLevel
              (Buffer
wb Buffer -> [Buffer] -> [Buffer]
forall a. a -> [a] -> [a]
: [[Buffer]] -> [Buffer]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[Buffer]]
levels [Buffer] -> [Buffer] -> [Buffer]
forall a. [a] -> [a] -> [a]
++ Maybe Buffer -> [Buffer]
forall a. Maybe a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (MTree Buffer -> Buffer
mergeTree (MTree Buffer -> Buffer) -> Maybe (MTree Buffer) -> Maybe Buffer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (MTree Buffer)
tree))
    Map Key (Value, Maybe Blob) -> ST s (Map Key (Value, Maybe Blob))
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Op -> Maybe (Value, Maybe Blob))
-> Buffer -> Map Key (Value, Maybe Blob)
forall a b k. (a -> Maybe b) -> Map k a -> Map k b
Map.mapMaybe Op -> Maybe (Value, Maybe Blob)
forall {a} {b}. Update a b -> Maybe (a, Maybe b)
justInsert Buffer
r)
  where
    mergeTree :: MTree Run -> Run
    mergeTree :: MTree Buffer -> Buffer
mergeTree (MLeaf Buffer
r)     = Buffer
r
    mergeTree (MNode TreeMergeType
mt [MTree Buffer]
ts) = TreeMergeType -> [Buffer] -> Buffer
forall t. IsMergeType t => t -> [Buffer] -> Buffer
mergek TreeMergeType
mt ((MTree Buffer -> Buffer) -> [MTree Buffer] -> [Buffer]
forall a b. (a -> b) -> [a] -> [b]
map MTree Buffer -> Buffer
mergeTree [MTree Buffer]
ts)

    justInsert :: Update a b -> Maybe (a, Maybe b)
justInsert (Insert a
v Maybe b
b) = (a, Maybe b) -> Maybe (a, Maybe b)
forall a. a -> Maybe a
Just (a
v, Maybe b
b)
    justInsert  Update a b
Delete      = Maybe (a, Maybe b)
forall a. Maybe a
Nothing
    justInsert (Mupsert a
v)  = (a, Maybe b) -> Maybe (a, Maybe b)
forall a. a -> Maybe a
Just (a
v, Maybe b
forall a. Maybe a
Nothing)

type Representation = (Run, [LevelRepresentation], Maybe (MTree Run))

type LevelRepresentation =
    (Maybe (MergePolicy, NominalDebt, NominalCredit,
            LevelMergeType, MergingRunState),
     [Run])

dumpRepresentation :: LSM s -> ST s Representation
dumpRepresentation :: forall s. LSM s -> ST s Representation
dumpRepresentation (LSMHandle STRef s Int
_ STRef s (LSMContent s)
lsmr) = do
    LSMContent Buffer
wb Levels s
ls UnionLevel s
ul <- STRef s (LSMContent s) -> ST s (LSMContent s)
forall s a. STRef s a -> ST s a
readSTRef STRef s (LSMContent s)
lsmr
    [LevelRepresentation]
levels <- (Level s -> ST s LevelRepresentation)
-> Levels s -> ST s [LevelRepresentation]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM Level s -> ST s LevelRepresentation
forall s. Level s -> ST s LevelRepresentation
dumpLevel Levels s
ls
    Maybe (MTree Buffer)
tree <- case UnionLevel s
ul of
      UnionLevel s
NoUnion   -> Maybe (MTree Buffer) -> ST s (Maybe (MTree Buffer))
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (MTree Buffer)
forall a. Maybe a
Nothing
      Union MergingTree s
t STRef s Int
_ -> MTree Buffer -> Maybe (MTree Buffer)
forall a. a -> Maybe a
Just (MTree Buffer -> Maybe (MTree Buffer))
-> ST s (MTree Buffer) -> ST s (Maybe (MTree Buffer))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MergingTree s -> ST s (MTree Buffer)
forall s. MergingTree s -> ST s (MTree Buffer)
flattenTree MergingTree s
t
    Representation -> ST s Representation
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer
wb, [LevelRepresentation]
levels, Maybe (MTree Buffer)
tree)

dumpLevel :: Level s -> ST s LevelRepresentation
dumpLevel :: forall s. Level s -> ST s LevelRepresentation
dumpLevel (Level (Single Buffer
r) [Buffer]
rs) =
    LevelRepresentation -> ST s LevelRepresentation
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
  (MergePolicy, NominalDebt, NominalCredit, LevelMergeType,
   MergingRunState)
forall a. Maybe a
Nothing, (Buffer
rBuffer -> [Buffer] -> [Buffer]
forall a. a -> [a] -> [a]
:[Buffer]
rs))
dumpLevel (Level (Merging MergePolicy
mp NominalDebt
nd STRef s NominalCredit
ncv (MergingRun LevelMergeType
mt MergeDebt
_ STRef s MergingRunState
ref)) [Buffer]
rs) = do
    MergingRunState
mrs <- STRef s MergingRunState -> ST s MergingRunState
forall s a. STRef s a -> ST s a
readSTRef STRef s MergingRunState
ref
    NominalCredit
nc  <- STRef s NominalCredit -> ST s NominalCredit
forall s a. STRef s a -> ST s a
readSTRef STRef s NominalCredit
ncv
    LevelRepresentation -> ST s LevelRepresentation
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ((MergePolicy, NominalDebt, NominalCredit, LevelMergeType,
 MergingRunState)
-> Maybe
     (MergePolicy, NominalDebt, NominalCredit, LevelMergeType,
      MergingRunState)
forall a. a -> Maybe a
Just (MergePolicy
mp, NominalDebt
nd, NominalCredit
nc, LevelMergeType
mt, MergingRunState
mrs), [Buffer]
rs)

-- For each level:
-- 1. the runs involved in an ongoing merge
-- 2. the other runs (including completed merge)
representationShape :: Representation
                    -> (Int, [([Int], [Int])], Maybe (MTree Int))
representationShape :: Representation -> (Int, [([Int], [Int])], Maybe (MTree Int))
representationShape (Buffer
wb, [LevelRepresentation]
levels, Maybe (MTree Buffer)
tree) =
    (Buffer -> Int
summaryRun Buffer
wb, (LevelRepresentation -> ([Int], [Int]))
-> [LevelRepresentation] -> [([Int], [Int])]
forall a b. (a -> b) -> [a] -> [b]
map LevelRepresentation -> ([Int], [Int])
forall {a} {b} {c} {d}.
(Maybe (a, b, c, d, MergingRunState), [Buffer]) -> ([Int], [Int])
summaryLevel [LevelRepresentation]
levels, (MTree Buffer -> MTree Int)
-> Maybe (MTree Buffer) -> Maybe (MTree Int)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Buffer -> Int) -> MTree Buffer -> MTree Int
forall a b. (a -> b) -> MTree a -> MTree b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Buffer -> Int
summaryRun) Maybe (MTree Buffer)
tree)
  where
    summaryLevel :: (Maybe (a, b, c, d, MergingRunState), [Buffer]) -> ([Int], [Int])
summaryLevel (Maybe (a, b, c, d, MergingRunState)
mmr, [Buffer]
rs) =
      let ([Int]
ongoing, [Int]
complete) = Maybe (a, b, c, d, MergingRunState) -> ([Int], [Int])
forall {a} {b} {c} {d}.
Maybe (a, b, c, d, MergingRunState) -> ([Int], [Int])
summaryMR Maybe (a, b, c, d, MergingRunState)
mmr
      in ([Int]
ongoing, [Int]
complete [Int] -> [Int] -> [Int]
forall a. Semigroup a => a -> a -> a
<> (Buffer -> Int) -> [Buffer] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Buffer -> Int
summaryRun [Buffer]
rs)

    summaryRun :: Buffer -> Int
summaryRun = Buffer -> Int
runSize

    summaryMR :: Maybe (a, b, c, d, MergingRunState) -> ([Int], [Int])
summaryMR = \case
      Maybe (a, b, c, d, MergingRunState)
Nothing                          -> ([], [])
      Just (a
_, b
_, c
_, d
_, CompletedMerge Buffer
r)    -> ([], [Buffer -> Int
summaryRun Buffer
r])
      Just (a
_, b
_, c
_, d
_, OngoingMerge MergeCredit
_ [Buffer]
rs Buffer
_) -> ((Buffer -> Int) -> [Buffer] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Buffer -> Int
summaryRun [Buffer]
rs, [])

-------------------------------------------------------------------------------
-- Tracing
--

-- TODO: these events are incomplete, in particular we should also trace what
-- happens in the union level.
type Event = EventAt EventDetail
data EventAt e = EventAt {
                   forall e. EventAt e -> Int
eventAtStep  :: Counter,
                   forall e. EventAt e -> Int
eventAtLevel :: Int,
                   forall e. EventAt e -> e
eventDetail  :: e
                 }
  deriving stock Int -> EventAt e -> ShowS
[EventAt e] -> ShowS
EventAt e -> String
(Int -> EventAt e -> ShowS)
-> (EventAt e -> String)
-> ([EventAt e] -> ShowS)
-> Show (EventAt e)
forall e. Show e => Int -> EventAt e -> ShowS
forall e. Show e => [EventAt e] -> ShowS
forall e. Show e => EventAt e -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall e. Show e => Int -> EventAt e -> ShowS
showsPrec :: Int -> EventAt e -> ShowS
$cshow :: forall e. Show e => EventAt e -> String
show :: EventAt e -> String
$cshowList :: forall e. Show e => [EventAt e] -> ShowS
showList :: [EventAt e] -> ShowS
Show

data EventDetail =
       AddLevelEvent
     | AddRunEvent {
         EventDetail -> Int
runsAtLevel   :: Int
       }
     | MergeStartedEvent {
         EventDetail -> MergePolicy
mergePolicy   :: MergePolicy,
         EventDetail -> LevelMergeType
mergeType     :: LevelMergeType,
         EventDetail -> Int
mergeDebt     :: Debt,
         EventDetail -> [Int]
mergeRunsSize :: [Int]
       }
     | MergeCompletedEvent {
         mergePolicy :: MergePolicy,
         mergeType   :: LevelMergeType,
         EventDetail -> Int
mergeSize   :: Int
       }
  deriving stock Int -> EventDetail -> ShowS
[EventDetail] -> ShowS
EventDetail -> String
(Int -> EventDetail -> ShowS)
-> (EventDetail -> String)
-> ([EventDetail] -> ShowS)
-> Show EventDetail
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EventDetail -> ShowS
showsPrec :: Int -> EventDetail -> ShowS
$cshow :: EventDetail -> String
show :: EventDetail -> String
$cshowList :: [EventDetail] -> ShowS
showList :: [EventDetail] -> ShowS
Show

-------------------------------------------------------------------------------
-- Arbitrary
--

instance QC.Arbitrary Key where
  arbitrary :: Gen Key
arbitrary = Int -> Key
K (Int -> Key) -> Gen Int -> Gen Key
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen Int
forall a. Integral a => Gen a
QC.arbitrarySizedNatural
  shrink :: Key -> [Key]
shrink (K Int
v) = Int -> Key
K (Int -> Key) -> [Int] -> [Key]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> [Int]
forall a. Arbitrary a => a -> [a]
QC.shrink Int
v

instance QC.Arbitrary Value where
  arbitrary :: Gen Value
arbitrary = Int -> Value
V (Int -> Value) -> Gen Int -> Gen Value
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen Int
forall a. Integral a => Gen a
QC.arbitrarySizedNatural
  shrink :: Value -> [Value]
shrink (V Int
v) = Int -> Value
V (Int -> Value) -> [Int] -> [Value]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> [Int]
forall a. Arbitrary a => a -> [a]
QC.shrink Int
v

instance QC.Arbitrary Blob where
  arbitrary :: Gen Blob
arbitrary = Int -> Blob
B (Int -> Blob) -> Gen Int -> Gen Blob
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen Int
forall a. Integral a => Gen a
QC.arbitrarySizedNatural
  shrink :: Blob -> [Blob]
shrink (B Int
v) = Int -> Blob
B (Int -> Blob) -> [Int] -> [Blob]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> [Int]
forall a. Arbitrary a => a -> [a]
QC.shrink Int
v

instance (QC.Arbitrary v, QC.Arbitrary b) => QC.Arbitrary (Update v b) where
  arbitrary :: Gen (Update v b)
arbitrary = [(Int, Gen (Update v b))] -> Gen (Update v b)
forall a. HasCallStack => [(Int, Gen a)] -> Gen a
QC.frequency
      [ (Int
3, v -> Maybe b -> Update v b
forall v b. v -> Maybe b -> Update v b
Insert (v -> Maybe b -> Update v b)
-> Gen v -> Gen (Maybe b -> Update v b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen v
forall a. Arbitrary a => Gen a
QC.arbitrary Gen (Maybe b -> Update v b) -> Gen (Maybe b) -> Gen (Update v b)
forall a b. Gen (a -> b) -> Gen a -> Gen b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Gen (Maybe b)
forall a. Arbitrary a => Gen a
QC.arbitrary)
      , (Int
1, v -> Update v b
forall v b. v -> Update v b
Mupsert (v -> Update v b) -> Gen v -> Gen (Update v b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen v
forall a. Arbitrary a => Gen a
QC.arbitrary)
      , (Int
1, Update v b -> Gen (Update v b)
forall a. a -> Gen a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Update v b
forall v b. Update v b
Delete)
      ]

instance QC.Arbitrary LevelMergeType where
  arbitrary :: Gen LevelMergeType
arbitrary = [LevelMergeType] -> Gen LevelMergeType
forall a. HasCallStack => [a] -> Gen a
QC.elements [LevelMergeType
MergeMidLevel, LevelMergeType
MergeLastLevel]

instance QC.Arbitrary TreeMergeType where
  arbitrary :: Gen TreeMergeType
arbitrary = [TreeMergeType] -> Gen TreeMergeType
forall a. HasCallStack => [a] -> Gen a
QC.elements [TreeMergeType
MergeLevel, TreeMergeType
MergeUnion]