{-# LANGUAGE CPP           #-}
{-# LANGUAGE MagicHash     #-}
{-# LANGUAGE UnboxedTuples #-}

#if !(MIN_VERSION_GLASGOW_HASKELL(9,0,0,0))
-- Fix for ghc 8.10.x with deriving newtype Prim
{-# LANGUAGE DataKinds     #-}
#endif

module Database.LSMTree.Internal.IncomingRun (
    IncomingRun (..)
  , MergePolicyForLevel (..)
  , duplicateIncomingRun
  , releaseIncomingRun
  , newIncomingSingleRun
  , newIncomingMergingRun
  , snapshotIncomingRun

    -- * Credits and credit tracking
    -- $credittracking
  , NominalDebt (..)
  , NominalCredits (..)
  , nominalDebtAsCredits
  , supplyCreditsIncomingRun
  , immediatelyCompleteIncomingRun
  ) where

import           Control.Concurrent.Class.MonadMVar.Strict
import           Control.DeepSeq (NFData (..))
import           Control.Monad.Class.MonadST (MonadST)
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..))
import           Control.Monad.Primitive
import           Control.RefCount
import           Data.Primitive (Prim)
import           Data.Primitive.PrimVar
import           Database.LSMTree.Internal.Assertions (assert)
import           Database.LSMTree.Internal.Config
import           Database.LSMTree.Internal.Entry (NumEntries (..))
import           Database.LSMTree.Internal.MergingRun (MergeCredits (..),
                     MergeDebt (..), MergingRun)
import qualified Database.LSMTree.Internal.MergingRun as MR
import           Database.LSMTree.Internal.Run (Run)

import           GHC.Exts (Word (W#), quotRemWord2#, timesWord2#)

{-------------------------------------------------------------------------------
  Incoming runs
-------------------------------------------------------------------------------}

-- | An incoming run is either a single run, or a merge.
data IncomingRun m h =
       Single  !(Ref (Run m h))
     | Merging !MergePolicyForLevel
               !NominalDebt
               !(PrimVar (PrimState m) NominalCredits)
               !(Ref (MergingRun MR.LevelMergeType m h))

data MergePolicyForLevel = LevelTiering | LevelLevelling
  deriving stock (Int -> MergePolicyForLevel -> ShowS
[MergePolicyForLevel] -> ShowS
MergePolicyForLevel -> String
(Int -> MergePolicyForLevel -> ShowS)
-> (MergePolicyForLevel -> String)
-> ([MergePolicyForLevel] -> ShowS)
-> Show MergePolicyForLevel
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MergePolicyForLevel -> ShowS
showsPrec :: Int -> MergePolicyForLevel -> ShowS
$cshow :: MergePolicyForLevel -> String
show :: MergePolicyForLevel -> String
$cshowList :: [MergePolicyForLevel] -> ShowS
showList :: [MergePolicyForLevel] -> ShowS
Show, MergePolicyForLevel -> MergePolicyForLevel -> Bool
(MergePolicyForLevel -> MergePolicyForLevel -> Bool)
-> (MergePolicyForLevel -> MergePolicyForLevel -> Bool)
-> Eq MergePolicyForLevel
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MergePolicyForLevel -> MergePolicyForLevel -> Bool
== :: MergePolicyForLevel -> MergePolicyForLevel -> Bool
$c/= :: MergePolicyForLevel -> MergePolicyForLevel -> Bool
/= :: MergePolicyForLevel -> MergePolicyForLevel -> Bool
Eq)

instance NFData MergePolicyForLevel where
  rnf :: MergePolicyForLevel -> ()
rnf MergePolicyForLevel
LevelTiering   = ()
  rnf MergePolicyForLevel
LevelLevelling = ()

{-# SPECIALISE duplicateIncomingRun :: IncomingRun IO h -> IO (IncomingRun IO h) #-}
duplicateIncomingRun ::
     (PrimMonad m, MonadMask m)
  => IncomingRun m h
  -> m (IncomingRun m h)
duplicateIncomingRun :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
IncomingRun m h -> m (IncomingRun m h)
duplicateIncomingRun (Single Ref (Run m h)
r) =
    Ref (Run m h) -> IncomingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> IncomingRun m h
Single (Ref (Run m h) -> IncomingRun m h)
-> m (Ref (Run m h)) -> m (IncomingRun m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m,
 ?callStack::CallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r

duplicateIncomingRun (Merging MergePolicyForLevel
mp NominalDebt
md PrimVar (PrimState m) NominalCredits
mcv Ref (MergingRun LevelMergeType m h)
mr) =
    MergePolicyForLevel
-> NominalDebt
-> PrimVar (PrimState m) NominalCredits
-> Ref (MergingRun LevelMergeType m h)
-> IncomingRun m h
forall (m :: * -> *) h.
MergePolicyForLevel
-> NominalDebt
-> PrimVar (PrimState m) NominalCredits
-> Ref (MergingRun LevelMergeType m h)
-> IncomingRun m h
Merging MergePolicyForLevel
mp NominalDebt
md (PrimVar (PrimState m) NominalCredits
 -> Ref (MergingRun LevelMergeType m h) -> IncomingRun m h)
-> m (PrimVar (PrimState m) NominalCredits)
-> m (Ref (MergingRun LevelMergeType m h) -> IncomingRun m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (NominalCredits -> m (PrimVar (PrimState m) NominalCredits)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
a -> m (PrimVar (PrimState m) a)
newPrimVar (NominalCredits -> m (PrimVar (PrimState m) NominalCredits))
-> m NominalCredits -> m (PrimVar (PrimState m) NominalCredits)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< PrimVar (PrimState m) NominalCredits -> m NominalCredits
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> m a
readPrimVar PrimVar (PrimState m) NominalCredits
mcv)
                  m (Ref (MergingRun LevelMergeType m h) -> IncomingRun m h)
-> m (Ref (MergingRun LevelMergeType m h)) -> m (IncomingRun m h)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Ref (MergingRun LevelMergeType m h)
-> m (Ref (MergingRun LevelMergeType m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m,
 ?callStack::CallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingRun LevelMergeType m h)
mr

{-# SPECIALISE releaseIncomingRun :: IncomingRun IO h -> IO () #-}
releaseIncomingRun ::
     (PrimMonad m, MonadMask m)
  => IncomingRun m h -> m ()
releaseIncomingRun :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
IncomingRun m h -> m ()
releaseIncomingRun (Single         Ref (Run m h)
r) = Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m,
 ?callStack::CallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r
releaseIncomingRun (Merging MergePolicyForLevel
_ NominalDebt
_ PrimVar (PrimState m) NominalCredits
_ Ref (MergingRun LevelMergeType m h)
mr) = Ref (MergingRun LevelMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m,
 ?callStack::CallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun LevelMergeType m h)
mr

{-# INLINE newIncomingSingleRun #-}
newIncomingSingleRun ::
     (PrimMonad m, MonadThrow m)
  => Ref (Run m h)
  -> m (IncomingRun m h)
newIncomingSingleRun :: forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
Ref (Run m h) -> m (IncomingRun m h)
newIncomingSingleRun Ref (Run m h)
r = Ref (Run m h) -> IncomingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> IncomingRun m h
Single (Ref (Run m h) -> IncomingRun m h)
-> m (Ref (Run m h)) -> m (IncomingRun m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m,
 ?callStack::CallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r

{-# INLINE newIncomingMergingRun #-}
newIncomingMergingRun ::
     (PrimMonad m, MonadThrow m)
  => MergePolicyForLevel
  -> NominalDebt
  -> Ref (MergingRun MR.LevelMergeType m h)
  -> m (IncomingRun m h)
newIncomingMergingRun :: forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
MergePolicyForLevel
-> NominalDebt
-> Ref (MergingRun LevelMergeType m h)
-> m (IncomingRun m h)
newIncomingMergingRun MergePolicyForLevel
mergePolicy NominalDebt
nominalDebt Ref (MergingRun LevelMergeType m h)
mr = do
    PrimVar (PrimState m) NominalCredits
nominalCreditsVar <- NominalCredits -> m (PrimVar (PrimState m) NominalCredits)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
a -> m (PrimVar (PrimState m) a)
newPrimVar (Int -> NominalCredits
NominalCredits Int
0)
    MergePolicyForLevel
-> NominalDebt
-> PrimVar (PrimState m) NominalCredits
-> Ref (MergingRun LevelMergeType m h)
-> IncomingRun m h
forall (m :: * -> *) h.
MergePolicyForLevel
-> NominalDebt
-> PrimVar (PrimState m) NominalCredits
-> Ref (MergingRun LevelMergeType m h)
-> IncomingRun m h
Merging MergePolicyForLevel
mergePolicy NominalDebt
nominalDebt PrimVar (PrimState m) NominalCredits
nominalCreditsVar (Ref (MergingRun LevelMergeType m h) -> IncomingRun m h)
-> m (Ref (MergingRun LevelMergeType m h)) -> m (IncomingRun m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ref (MergingRun LevelMergeType m h)
-> m (Ref (MergingRun LevelMergeType m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m,
 ?callStack::CallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingRun LevelMergeType m h)
mr

{-# SPECIALISE snapshotIncomingRun ::
     IncomingRun IO h
  -> IO (Either (Ref (Run IO h))
                (MergePolicyForLevel,
                 NominalDebt,
                 NominalCredits,
                 Ref (MergingRun MR.LevelMergeType IO h))) #-}
snapshotIncomingRun ::
     PrimMonad m
  => IncomingRun m h
  -> m (Either (Ref (Run m h))
               (MergePolicyForLevel,
                NominalDebt,
                NominalCredits,
                Ref (MergingRun MR.LevelMergeType m h)))
snapshotIncomingRun :: forall (m :: * -> *) h.
PrimMonad m =>
IncomingRun m h
-> m (Either
        (Ref (Run m h))
        (MergePolicyForLevel, NominalDebt, NominalCredits,
         Ref (MergingRun LevelMergeType m h)))
snapshotIncomingRun (Single Ref (Run m h)
r) = Either
  (Ref (Run m h))
  (MergePolicyForLevel, NominalDebt, NominalCredits,
   Ref (MergingRun LevelMergeType m h))
-> m (Either
        (Ref (Run m h))
        (MergePolicyForLevel, NominalDebt, NominalCredits,
         Ref (MergingRun LevelMergeType m h)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Ref (Run m h)
-> Either
     (Ref (Run m h))
     (MergePolicyForLevel, NominalDebt, NominalCredits,
      Ref (MergingRun LevelMergeType m h))
forall a b. a -> Either a b
Left Ref (Run m h)
r)
snapshotIncomingRun (Merging MergePolicyForLevel
mergePolicy NominalDebt
nominalDebt PrimVar (PrimState m) NominalCredits
nominalCreditsVar Ref (MergingRun LevelMergeType m h)
mr) = do
    NominalCredits
nominalCredits <- PrimVar (PrimState m) NominalCredits -> m NominalCredits
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> m a
readPrimVar PrimVar (PrimState m) NominalCredits
nominalCreditsVar
    Either
  (Ref (Run m h))
  (MergePolicyForLevel, NominalDebt, NominalCredits,
   Ref (MergingRun LevelMergeType m h))
-> m (Either
        (Ref (Run m h))
        (MergePolicyForLevel, NominalDebt, NominalCredits,
         Ref (MergingRun LevelMergeType m h)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((MergePolicyForLevel, NominalDebt, NominalCredits,
 Ref (MergingRun LevelMergeType m h))
-> Either
     (Ref (Run m h))
     (MergePolicyForLevel, NominalDebt, NominalCredits,
      Ref (MergingRun LevelMergeType m h))
forall a b. b -> Either a b
Right (MergePolicyForLevel
mergePolicy, NominalDebt
nominalDebt, NominalCredits
nominalCredits, Ref (MergingRun LevelMergeType m h)
mr))

{-------------------------------------------------------------------------------
  Credits
-------------------------------------------------------------------------------}

{- $credittracking

With scheduled merges, each update (e.g., insert) on a table contributes to the
progression of ongoing merges in the levels structure. This ensures that merges
are finished in time before a new merge has to be started. The points in the
evolution of the levels structure where new merges are started are known: a
flush of a full write buffer will create a new run on the first level, and
after sufficient flushes (e.g., 4) we will start at least one new merge on the
second level. This may cascade down to lower levels depending on how full the
levels are. As such, we have a well-defined measure to determine when merges
should be finished: it only depends on the maximum size of the write buffer!

The simplest solution to making sure merges are done in time is to step them to
completion immediately when started. This does not, however, spread out work
over time nicely. Instead, we schedule merge work based on how many updates are
made on the table, taking care to ensure that the merge is finished /just/ in
time before the next flush comes around, and not too early.

The progression is tracked using nominal credits. Each individual update
contributes a single credit to each level, since each level contains precisely
one ongoing merge. Contributing a credit does not, however, translate directly
to performing one /unit/ of merging work:

* The amount of work to do for one credit is adjusted depending on the actual
  size of the merge we are doing. Last-level merges, for example, can have
  larger inputs, and therefore we have to do a little more work for each
  credit. Or input runs involved in a merge can be less than maximal size for
  the level, and so there may be less merging work to do. As such, we /scale/
  'NominalCredits' to 'MergeCredits', and then supply the 'MergeCredits' to
  the 'MergingRun'.

* Supplying 'MergeCredits' to a 'MergingRun' does not necessarily directly
  translate into performing merging work. Merge credits are accumulated until
  they go over a threshold, after which a batch of merge work will be performed.
  Configuring this threshold should allow a good balance between spreading out
  I\/O and achieving good (concurrent) performance.

Merging runs can be shared across tables, which means that multiple threads
can contribute to the same merge concurrently. Incoming runs however are /not/
shared between tables. As such the tracking of 'NominalCredits' does not need
to use any concurrency precautions.
-}

-- | Total merge debt to complete the merge in an incoming run.
--
-- This corresponds to the number (worst case, minimum number) of update
-- operations inserted into the table, before we will expect the merge to
-- complete.
newtype NominalDebt = NominalDebt Int
  deriving stock NominalDebt -> NominalDebt -> Bool
(NominalDebt -> NominalDebt -> Bool)
-> (NominalDebt -> NominalDebt -> Bool) -> Eq NominalDebt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: NominalDebt -> NominalDebt -> Bool
== :: NominalDebt -> NominalDebt -> Bool
$c/= :: NominalDebt -> NominalDebt -> Bool
/= :: NominalDebt -> NominalDebt -> Bool
Eq
  deriving newtype (NominalDebt -> ()
(NominalDebt -> ()) -> NFData NominalDebt
forall a. (a -> ()) -> NFData a
$crnf :: NominalDebt -> ()
rnf :: NominalDebt -> ()
NFData)

-- | Merge credits that get supplied to a table's levels.
--
-- This corresponds to the number of update operations inserted into the table.
newtype NominalCredits = NominalCredits Int
  deriving stock NominalCredits -> NominalCredits -> Bool
(NominalCredits -> NominalCredits -> Bool)
-> (NominalCredits -> NominalCredits -> Bool) -> Eq NominalCredits
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: NominalCredits -> NominalCredits -> Bool
== :: NominalCredits -> NominalCredits -> Bool
$c/= :: NominalCredits -> NominalCredits -> Bool
/= :: NominalCredits -> NominalCredits -> Bool
Eq
  deriving newtype (Addr# -> Int# -> NominalCredits
ByteArray# -> Int# -> NominalCredits
Proxy NominalCredits -> Int#
NominalCredits -> Int#
(Proxy NominalCredits -> Int#)
-> (NominalCredits -> Int#)
-> (Proxy NominalCredits -> Int#)
-> (NominalCredits -> Int#)
-> (ByteArray# -> Int# -> NominalCredits)
-> (forall s.
    MutableByteArray# s
    -> Int# -> State# s -> (# State# s, NominalCredits #))
-> (forall s.
    MutableByteArray# s
    -> Int# -> NominalCredits -> State# s -> State# s)
-> (forall s.
    MutableByteArray# s
    -> Int# -> Int# -> NominalCredits -> State# s -> State# s)
-> (Addr# -> Int# -> NominalCredits)
-> (forall s.
    Addr# -> Int# -> State# s -> (# State# s, NominalCredits #))
-> (forall s.
    Addr# -> Int# -> NominalCredits -> State# s -> State# s)
-> (forall s.
    Addr# -> Int# -> Int# -> NominalCredits -> State# s -> State# s)
-> Prim NominalCredits
forall s.
Addr# -> Int# -> Int# -> NominalCredits -> State# s -> State# s
forall s.
Addr# -> Int# -> State# s -> (# State# s, NominalCredits #)
forall s. Addr# -> Int# -> NominalCredits -> State# s -> State# s
forall s.
MutableByteArray# s
-> Int# -> Int# -> NominalCredits -> State# s -> State# s
forall s.
MutableByteArray# s
-> Int# -> State# s -> (# State# s, NominalCredits #)
forall s.
MutableByteArray# s
-> Int# -> NominalCredits -> State# s -> State# s
forall a.
(Proxy a -> Int#)
-> (a -> Int#)
-> (Proxy a -> Int#)
-> (a -> Int#)
-> (ByteArray# -> Int# -> a)
-> (forall s.
    MutableByteArray# s -> Int# -> State# s -> (# State# s, a #))
-> (forall s.
    MutableByteArray# s -> Int# -> a -> State# s -> State# s)
-> (forall s.
    MutableByteArray# s -> Int# -> Int# -> a -> State# s -> State# s)
-> (Addr# -> Int# -> a)
-> (forall s. Addr# -> Int# -> State# s -> (# State# s, a #))
-> (forall s. Addr# -> Int# -> a -> State# s -> State# s)
-> (forall s. Addr# -> Int# -> Int# -> a -> State# s -> State# s)
-> Prim a
$csizeOfType# :: Proxy NominalCredits -> Int#
sizeOfType# :: Proxy NominalCredits -> Int#
$csizeOf# :: NominalCredits -> Int#
sizeOf# :: NominalCredits -> Int#
$calignmentOfType# :: Proxy NominalCredits -> Int#
alignmentOfType# :: Proxy NominalCredits -> Int#
$calignment# :: NominalCredits -> Int#
alignment# :: NominalCredits -> Int#
$cindexByteArray# :: ByteArray# -> Int# -> NominalCredits
indexByteArray# :: ByteArray# -> Int# -> NominalCredits
$creadByteArray# :: forall s.
MutableByteArray# s
-> Int# -> State# s -> (# State# s, NominalCredits #)
readByteArray# :: forall s.
MutableByteArray# s
-> Int# -> State# s -> (# State# s, NominalCredits #)
$cwriteByteArray# :: forall s.
MutableByteArray# s
-> Int# -> NominalCredits -> State# s -> State# s
writeByteArray# :: forall s.
MutableByteArray# s
-> Int# -> NominalCredits -> State# s -> State# s
$csetByteArray# :: forall s.
MutableByteArray# s
-> Int# -> Int# -> NominalCredits -> State# s -> State# s
setByteArray# :: forall s.
MutableByteArray# s
-> Int# -> Int# -> NominalCredits -> State# s -> State# s
$cindexOffAddr# :: Addr# -> Int# -> NominalCredits
indexOffAddr# :: Addr# -> Int# -> NominalCredits
$creadOffAddr# :: forall s.
Addr# -> Int# -> State# s -> (# State# s, NominalCredits #)
readOffAddr# :: forall s.
Addr# -> Int# -> State# s -> (# State# s, NominalCredits #)
$cwriteOffAddr# :: forall s. Addr# -> Int# -> NominalCredits -> State# s -> State# s
writeOffAddr# :: forall s. Addr# -> Int# -> NominalCredits -> State# s -> State# s
$csetOffAddr# :: forall s.
Addr# -> Int# -> Int# -> NominalCredits -> State# s -> State# s
setOffAddr# :: forall s.
Addr# -> Int# -> Int# -> NominalCredits -> State# s -> State# s
Prim, NominalCredits -> ()
(NominalCredits -> ()) -> NFData NominalCredits
forall a. (a -> ()) -> NFData a
$crnf :: NominalCredits -> ()
rnf :: NominalCredits -> ()
NFData)

nominalDebtAsCredits :: NominalDebt -> NominalCredits
nominalDebtAsCredits :: NominalDebt -> NominalCredits
nominalDebtAsCredits (NominalDebt Int
c) = Int -> NominalCredits
NominalCredits Int
c

{-# SPECIALISE supplyCreditsIncomingRun ::
     TableConfig
  -> LevelNo
  -> IncomingRun IO h
  -> NominalCredits
  -> IO () #-}
-- | Supply a given number of nominal credits to the merge in an incoming run.
-- This is a relative addition of credits, not a new absolute total value.
supplyCreditsIncomingRun ::
     (MonadSTM m, MonadST m, MonadMVar m, MonadMask m)
  => TableConfig
  -> LevelNo
  -> IncomingRun m h
  -> NominalCredits
  -> m ()
supplyCreditsIncomingRun :: forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> LevelNo -> IncomingRun m h -> NominalCredits -> m ()
supplyCreditsIncomingRun TableConfig
_ LevelNo
_ (Single Ref (Run m h)
_r) NominalCredits
_ = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
supplyCreditsIncomingRun TableConfig
conf LevelNo
ln (Merging MergePolicyForLevel
_ NominalDebt
nominalDebt PrimVar (PrimState m) NominalCredits
nominalCreditsVar Ref (MergingRun LevelMergeType m h)
mr)
                         NominalCredits
deposit = do
    (NominalCredits
_nominalCredits,
     NominalCredits
nominalCredits') <- NominalDebt
-> PrimVar (PrimState m) NominalCredits
-> NominalCredits
-> m (NominalCredits, NominalCredits)
forall (m :: * -> *).
PrimMonad m =>
NominalDebt
-> PrimVar (PrimState m) NominalCredits
-> NominalCredits
-> m (NominalCredits, NominalCredits)
depositNominalCredits NominalDebt
nominalDebt PrimVar (PrimState m) NominalCredits
nominalCreditsVar
                                               NominalCredits
deposit
    let !mergeDebt :: MergeDebt
mergeDebt     = Ref (MergingRun LevelMergeType m h) -> MergeDebt
forall t (m :: * -> *) h. Ref (MergingRun t m h) -> MergeDebt
MR.totalMergeDebt Ref (MergingRun LevelMergeType m h)
mr
        !mergeCredits' :: MergeCredits
mergeCredits' = NominalDebt -> MergeDebt -> NominalCredits -> MergeCredits
scaleNominalToMergeCredit NominalDebt
nominalDebt MergeDebt
mergeDebt
                                                   NominalCredits
nominalCredits'
        !thresh :: CreditThreshold
thresh = TableConfig -> LevelNo -> CreditThreshold
creditThresholdForLevel TableConfig
conf LevelNo
ln
    (MergeCredits
_suppliedCredits,
     MergeCredits
_suppliedCredits') <- Ref (MergingRun LevelMergeType m h)
-> CreditThreshold
-> MergeCredits
-> m (MergeCredits, MergeCredits)
forall t (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h)
-> CreditThreshold
-> MergeCredits
-> m (MergeCredits, MergeCredits)
MR.supplyCreditsAbsolute Ref (MergingRun LevelMergeType m h)
mr CreditThreshold
thresh MergeCredits
mergeCredits'
    () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    --TODO: currently each supplying credits action results in contributing
    -- credits to the underlying merge, but this need not be the case. We
    -- _could_ do threshold based batching at the level of the IncomingRun.
    -- The IncomingRun does not need to worry about concurrency, so does not
    -- pay the cost of atomic operations on the counters. Then when we
    -- accumulate a batch we could supply that to the MergingRun (which must
    -- use atomic operations for its counters). We could potentially simplify
    -- MergingRun by dispensing with batching for the MergeCredits counters.

-- TODO: the thresholds for doing merge work should be different for each level,
-- maybe co-prime?
creditThresholdForLevel :: TableConfig -> LevelNo -> MR.CreditThreshold
creditThresholdForLevel :: TableConfig -> LevelNo -> CreditThreshold
creditThresholdForLevel TableConfig
conf (LevelNo Int
_i) =
    let AllocNumEntries (NumEntries Int
x) = TableConfig -> WriteBufferAlloc
confWriteBufferAlloc TableConfig
conf
    in  UnspentCredits -> CreditThreshold
MR.CreditThreshold (MergeCredits -> UnspentCredits
MR.UnspentCredits (Int -> MergeCredits
MergeCredits Int
x))

-- | Deposit nominal credits in the local credits var, ensuring the total
-- credits does not exceed the total debt.
--
-- Depositing /could/ leave the credit higher than the total debt. It is not
-- avoided by construction. The scenario is this: when a completed merge is
-- underfull, we combine it with the incoming run, so it means we have one run
-- fewer on the level then we'd normally have. This means that the level
-- becomes full at a later time, so more time passes before we call
-- 'MR.expectCompleted' on any levels further down the tree. This means we keep
-- supplying nominal credits to levels further down past the point their
-- nominal debt is paid off. So the solution here is just to drop any nominal
-- credits that are in excess of the nominal debt.
--
-- This is /not/ itself thread safe. All 'TableContent' update operations are
-- expected to be serialised by the caller. See concurrency comments for
-- 'TableContent' for detail.
depositNominalCredits ::
     PrimMonad m
  => NominalDebt
  -> PrimVar (PrimState m) NominalCredits
  -> NominalCredits
  -> m (NominalCredits, NominalCredits)
depositNominalCredits :: forall (m :: * -> *).
PrimMonad m =>
NominalDebt
-> PrimVar (PrimState m) NominalCredits
-> NominalCredits
-> m (NominalCredits, NominalCredits)
depositNominalCredits (NominalDebt Int
nominalDebt)
                      PrimVar (PrimState m) NominalCredits
nominalCreditsVar
                      (NominalCredits Int
deposit) = do
    NominalCredits Int
before <- PrimVar (PrimState m) NominalCredits -> m NominalCredits
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> m a
readPrimVar PrimVar (PrimState m) NominalCredits
nominalCreditsVar
    let !after :: NominalCredits
after = Int -> NominalCredits
NominalCredits (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (Int
before Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
deposit) Int
nominalDebt)
    PrimVar (PrimState m) NominalCredits -> NominalCredits -> m ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> a -> m ()
writePrimVar PrimVar (PrimState m) NominalCredits
nominalCreditsVar NominalCredits
after
    (NominalCredits, NominalCredits)
-> m (NominalCredits, NominalCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> NominalCredits
NominalCredits Int
before, NominalCredits
after)

-- | Linearly scale a nominal credit (between 0 and the nominal debt) into an
-- equivalent merge credit (between 0 and the total merge debt).
--
-- Crucially, @100% nominal credit ~~ 100% merge credit@, so when we pay off
-- the nominal debt, we also exactly pay off the merge debt. That is:
--
-- > scaleNominalToMergeCredit nominalDebt mergeDebt nominalDebt == mergeDebt
--
-- (modulo some newtype conversions)
--
scaleNominalToMergeCredit ::
     NominalDebt
  -> MergeDebt
  -> NominalCredits
  -> MergeCredits
scaleNominalToMergeCredit :: NominalDebt -> MergeDebt -> NominalCredits -> MergeCredits
scaleNominalToMergeCredit (NominalDebt             Int
nominalDebt)
                          (MergeDebt (MergeCredits Int
mergeDebt))
                          (NominalCredits          Int
nominalCredits) =
    -- The scaling involves an operation: (a * b) `div` c
    -- but where potentially the variables a,b,c may be bigger than a 32bit
    -- integer can hold. This would be the case for runs that have more than
    -- 4 billion entries.
    --
    -- (This is assuming 64bit Int, the problem would be even worse for 32bit
    -- systems. The solution here would also work for 32bit systems, allowing
    -- up to, 2^31, 2 billion entries per run.)
    --
    -- To work correctly in this case we need higher range for the intermediate
    -- result a*b which could be bigger than 64bits can hold. A correct
    -- implementation can use Rational, but a fast implementation should use
    -- only integer operations. This is relevant because this is on the fast
    -- path for small insertions into the table that often do no merging work
    -- and just update credit counters.

    -- The fast implementation uses integer operations that produce a 128bit
    -- intermediate result for the a*b result, and use a 128bit numerator in
    -- the division operation (but 64bit denominator). These are known as
    -- "widening multiplication" and "narrowing division". GHC has direct
    -- support for these operations as primops: timesWord2# and quotRemWord2#,
    -- but they are not exposed through any high level API shipped with GHC.

    -- The specification using Rational is:
    let mergeCredits_spec :: Int
mergeCredits_spec = Rational -> Int
forall b. Integral b => Rational -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor (Rational -> Int) -> Rational -> Int
forall a b. (a -> b) -> a -> b
$ Int -> Rational
forall a. Real a => a -> Rational
toRational Int
nominalCredits
                                  Rational -> Rational -> Rational
forall a. Num a => a -> a -> a
* Int -> Rational
forall a. Real a => a -> Rational
toRational Int
mergeDebt
                                  Rational -> Rational -> Rational
forall a. Fractional a => a -> a -> a
/ Int -> Rational
forall a. Real a => a -> Rational
toRational Int
nominalDebt
    -- Note that it doesn't matter if we use floor or ceiling here.
    -- Rounding errors will not compound because we sum nominal debt and
    -- convert absolute nominal to absolute merging credit. We don't
    -- convert each deposit and sum all the rounding errors.
    -- When nominalCredits == nominalDebt then the result is exact anyway
    -- (being mergeDebt) so the rounding mode makes no difference when we
    -- get to the end of the merge. Using floor makes things simpler for
    -- the fast integer implementation below, so we take that as the spec.

        -- If the nominalCredits is between 0 and nominalDebt then it's
        -- guaranteed that the mergeCredit is between 0 and mergeDebt.
        -- The mergeDebt fits in an Int, therefore the result does too.
        -- Therefore the undefined behaviour case of timesDivABC_fast is
        -- avoided and the w2i cannot overflow.
        mergeCredits_fast :: Int
mergeCredits_fast = Word -> Int
w2i (Word -> Int) -> Word -> Int
forall a b. (a -> b) -> a -> b
$ Word -> Word -> Word -> Word
timesDivABC_fast (Int -> Word
i2w Int
nominalCredits)
                                                   (Int -> Word
i2w Int
mergeDebt)
                                                   (Int -> Word
i2w Int
nominalDebt)
     in Bool -> MergeCredits -> MergeCredits
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
0 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
nominalDebt) (MergeCredits -> MergeCredits) -> MergeCredits -> MergeCredits
forall a b. (a -> b) -> a -> b
$
        Bool -> MergeCredits -> MergeCredits
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
0 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
nominalCredits Bool -> Bool -> Bool
&& Int
nominalCredits Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
nominalDebt) (MergeCredits -> MergeCredits) -> MergeCredits -> MergeCredits
forall a b. (a -> b) -> a -> b
$
        Bool -> MergeCredits -> MergeCredits
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
mergeCredits_spec Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
mergeCredits_fast) (MergeCredits -> MergeCredits) -> MergeCredits -> MergeCredits
forall a b. (a -> b) -> a -> b
$
        Int -> MergeCredits
MergeCredits Int
mergeCredits_fast
  where
    {-# INLINE i2w #-}
    {-# INLINE w2i #-}
    i2w :: Int -> Word
    w2i :: Word -> Int
    i2w :: Int -> Word
i2w = Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral
    w2i :: Word -> Int
w2i = Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral

-- | Compute @(a * b) `div` c@ for unsigned integers for the full range of
-- 64bit unsigned integers, provided that @a <= c@ and thus the result will
-- fit in 64bits.
--
-- The @a * b@ intermediate result is computed using 128bit precision.
--
-- Note: the behaviour is undefined if the result will not fit in 64bits.
-- It will probably result in immediate termination with SIGFPE.
--
timesDivABC_fast :: Word -> Word -> Word -> Word
timesDivABC_fast :: Word -> Word -> Word -> Word
timesDivABC_fast (W# Word#
a) (W# Word#
b) (W# Word#
c) =
    case Word# -> Word# -> (# Word#, Word# #)
timesWord2# Word#
a Word#
b of
      (# Word#
ph, Word#
pl #) ->
            case Word# -> Word# -> Word# -> (# Word#, Word# #)
quotRemWord2# Word#
ph Word#
pl Word#
c of
              (# Word#
q, Word#
_r #) -> Word# -> Word
W# Word#
q

{-# SPECIALISE immediatelyCompleteIncomingRun ::
     TableConfig
  -> LevelNo
  -> IncomingRun IO h
  -> IO (Ref (Run IO h)) #-}
-- | Supply enough credits to complete the merge now.
immediatelyCompleteIncomingRun ::
     (MonadSTM m, MonadST m, MonadMVar m, MonadMask m)
  => TableConfig
  -> LevelNo
  -> IncomingRun m h
  -> m (Ref (Run m h))
immediatelyCompleteIncomingRun :: forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> LevelNo -> IncomingRun m h -> m (Ref (Run m h))
immediatelyCompleteIncomingRun TableConfig
conf LevelNo
ln IncomingRun m h
ir =
    case IncomingRun m h
ir of
      Single Ref (Run m h)
r -> Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m,
 ?callStack::CallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r
      Merging MergePolicyForLevel
_ (NominalDebt Int
nominalDebt) PrimVar (PrimState m) NominalCredits
nominalCreditsVar Ref (MergingRun LevelMergeType m h)
mr -> do

        NominalCredits Int
nominalCredits <- PrimVar (PrimState m) NominalCredits -> m NominalCredits
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> m a
readPrimVar PrimVar (PrimState m) NominalCredits
nominalCreditsVar
        let !deposit :: NominalCredits
deposit = Int -> NominalCredits
NominalCredits (Int
nominalDebt Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
nominalCredits)
        TableConfig -> LevelNo -> IncomingRun m h -> NominalCredits -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
TableConfig -> LevelNo -> IncomingRun m h -> NominalCredits -> m ()
supplyCreditsIncomingRun TableConfig
conf LevelNo
ln IncomingRun m h
ir NominalCredits
deposit

        -- This ensures the merge is really completed. However, we don't
        -- release the merge yet, but we do return a new reference to the run.
        Ref (MergingRun LevelMergeType m h) -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Ref (Run m h))
MR.expectCompleted Ref (MergingRun LevelMergeType m h)
mr