{-# LANGUAGE CPP             #-}
{-# LANGUAGE PatternSynonyms #-}
{-# OPTIONS_HADDOCK not-home #-}

-- | An incremental merge of multiple runs.
module Database.LSMTree.Internal.MergingRun (
    -- * Merging run
    MergingRun
  , RunParams (..)
  , new
  , newCompleted
  , duplicateRuns
  , remainingMergeDebt
  , supplyChecked
  , supplyCreditsRelative
  , supplyCreditsAbsolute
  , expectCompleted
  , snapshot
  , totalMergeDebt
  , mergeType

    -- * Merge types
  , IsMergeType (..)
  , LevelMergeType (..)
  , TreeMergeType (..)

    -- * Credit tracking
    -- $credittracking
  , MergeDebt (..)
  , numEntriesToMergeDebt
  , MergeCredits (..)
  , CreditThreshold (..)
  , SpentCredits (..)
  , UnspentCredits (..)

  -- * Concurrency
  -- $concurrency

    -- * Internal state
  , pattern MergingRun
  , mergeState
  , MergingRunState (..)
  , MergeKnownCompleted (..)
  , CreditsVar (..)
  , pattern CreditsPair

    -- * Errors
  , TableTooLargeError (..)
  ) where

import           Control.ActionRegistry
import           Control.Concurrent.Class.MonadMVar.Strict
import           Control.DeepSeq (NFData (..))
import           Control.Monad (when)
import           Control.Monad.Class.MonadST (MonadST)
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow (Exception,
                     MonadCatch (bracketOnError), MonadMask,
                     MonadThrow (throwIO))
import           Control.Monad.Primitive
import           Control.RefCount
import           Data.Bits
import           Data.Maybe (fromMaybe)
import           Data.Primitive.MutVar
import           Data.Primitive.PrimVar
import qualified Data.Vector as V
import           Database.LSMTree.Internal.Assertions (assert)
import           Database.LSMTree.Internal.Entry (NumEntries (..))
import           Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import           Database.LSMTree.Internal.Merge (IsMergeType (..),
                     LevelMergeType (..), Merge, RunParams (..),
                     StepResult (..), TreeMergeType (..))
import qualified Database.LSMTree.Internal.Merge as Merge
import           Database.LSMTree.Internal.Paths (RunFsPaths (..))
import           Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import           GHC.Stack (HasCallStack, callStack)
import           System.FS.API (HasFS)
import           System.FS.BlockIO.API (HasBlockIO)

data MergingRun t m h = MergingRun {

      -- | The total merge debt.
      --
      -- This corresponds to the sum of the number of entries in the input runs.
      forall t (m :: * -> *) h. MergingRun t m h -> MergeDebt
mergeDebt           :: !MergeDebt

      -- See $credittracking

      -- | A pair of counters for tracking supplied credits:
      --
      -- 1. The supplied credits that have been spent on merging steps plus the
      --    supplied credits that are in the process of being spent.
      -- 2. The supplied credits that have not been spent and are available to
      --    spend.
      --
      -- The counters are always read & modified together atomically.
      --
    , forall t (m :: * -> *) h.
MergingRun t m h -> CreditsVar (PrimState m)
mergeCreditsVar     :: !(CreditsVar (PrimState m))

      -- | A variable that caches knowledge about whether the merge has been
      -- completed. If 'MergeKnownCompleted', then we are sure the merge has
      -- been completed, otherwise if 'MergeMaybeCompleted' we have to check the
      -- 'MergingRunState'.
    , forall t (m :: * -> *) h.
MergingRun t m h -> MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted :: !(MutVar (PrimState m) MergeKnownCompleted)
    , forall t (m :: * -> *) h.
MergingRun t m h -> StrictMVar m (MergingRunState t m h)
mergeState          :: !(StrictMVar m (MergingRunState t m h))
    , forall t (m :: * -> *) h. MergingRun t m h -> RefCounter m
mergeRefCounter     :: !(RefCounter m)
    }

instance RefCounted m (MergingRun t m h) where
    getRefCounter :: MergingRun t m h -> RefCounter m
getRefCounter = MergingRun t m h -> RefCounter m
forall t (m :: * -> *) h. MergingRun t m h -> RefCounter m
mergeRefCounter

data MergingRunState t m h =
    CompletedMerge
      !(Ref (Run m h))
      -- ^ Output run
  | OngoingMerge
      !(V.Vector (Ref (Run m h)))
      -- ^ Input runs
      !(Merge t m h)

data MergeKnownCompleted = MergeKnownCompleted | MergeMaybeCompleted
  deriving stock MergeKnownCompleted -> MergeKnownCompleted -> Bool
(MergeKnownCompleted -> MergeKnownCompleted -> Bool)
-> (MergeKnownCompleted -> MergeKnownCompleted -> Bool)
-> Eq MergeKnownCompleted
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MergeKnownCompleted -> MergeKnownCompleted -> Bool
== :: MergeKnownCompleted -> MergeKnownCompleted -> Bool
$c/= :: MergeKnownCompleted -> MergeKnownCompleted -> Bool
/= :: MergeKnownCompleted -> MergeKnownCompleted -> Bool
Eq

instance NFData MergeKnownCompleted where
  rnf :: MergeKnownCompleted -> ()
rnf MergeKnownCompleted
MergeKnownCompleted = ()
  rnf MergeKnownCompleted
MergeMaybeCompleted = ()

{-# SPECIALISE new ::
     Merge.IsMergeType t
  => HasFS IO h
  -> HasBlockIO IO h
  -> ResolveSerialisedValue
  -> RunParams
  -> t
  -> RunFsPaths
  -> V.Vector (Ref (Run IO h))
  -> IO (Ref (MergingRun t IO h)) #-}
-- | Create a new merging run, returning a reference to it that must ultimately
-- be released via 'releaseRef'.
--
-- Duplicates the supplied references to the runs.
--
-- This function should be run with asynchronous exceptions masked to prevent
-- failing after internal resources have already been created.
new ::
     (Merge.IsMergeType t, MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
  => HasFS m h
  -> HasBlockIO m h
  -> ResolveSerialisedValue
  -> RunParams
  -> t
  -> RunFsPaths
  -> V.Vector (Ref (Run m h))
  -> m (Ref (MergingRun t m h))
new :: forall t (m :: * -> *) h.
(IsMergeType t, MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> t
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Ref (MergingRun t m h))
new HasFS m h
hfs HasBlockIO m h
hbio ResolveSerialisedValue
resolve RunParams
runParams t
ty RunFsPaths
runPaths Vector (Ref (Run m h))
inputRuns =
    Bool -> m (Ref (MergingRun t m h)) -> m (Ref (MergingRun t m h))
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Int
forall a. Vector a -> Int
V.length Vector (Ref (Run m h))
inputRuns Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m (Ref (MergingRun t m h)) -> m (Ref (MergingRun t m h)))
-> m (Ref (MergingRun t m h)) -> m (Ref (MergingRun t m h))
forall a b. (a -> b) -> a -> b
$ do
    -- there can be empty runs, which we don't want to include in the merge
    -- TODO: making runs non-empty would involve introducing a constructor
    -- @CompletedMergeEmpty@, but would simplify things and should be possible.
    let nonEmptyRuns :: Vector (Ref (Run m h))
nonEmptyRuns = (Ref (Run m h) -> Bool)
-> Vector (Ref (Run m h)) -> Vector (Ref (Run m h))
forall a. (a -> Bool) -> Vector a -> Vector a
V.filter (\Ref (Run m h)
r -> Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r NumEntries -> NumEntries -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> NumEntries
NumEntries Int
0) Vector (Ref (Run m h))
inputRuns
    -- If creating the Merge fails, we must release the references again.
    (ActionRegistry m -> m (Ref (MergingRun t m h)))
-> m (Ref (MergingRun t m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Ref (MergingRun t m h)))
 -> m (Ref (MergingRun t m h)))
-> (ActionRegistry m -> m (Ref (MergingRun t m h)))
-> m (Ref (MergingRun t m h))
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
      let dupRun :: Ref (Run m h) -> m (Ref (Run m h))
dupRun Ref (Run m h)
r = ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, ?callStack::CallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m,
 ?callStack::CallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m,
 ?callStack::CallStack) =>
Ref obj -> m ()
releaseRef
      case Vector (Ref (Run m h)) -> Int
forall a. Vector a -> Int
V.length Vector (Ref (Run m h))
nonEmptyRuns of
        Int
0 -> do
          -- we can't have an empty merge, but create a new empty run
          --
          -- potentially, we could have re-used one of the empty input runs (or
          -- even re-used a single non-empty input run if there are no others),
          -- as we do in the prototype. but that would mean that the result
          -- doesn't follow the supplied @runParams@.
          -- TODO: decide whether that optimisation is okay
          Ref (Run m h)
r <- HasFS m h
-> HasBlockIO m h -> RunParams -> RunFsPaths -> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h -> RunParams -> RunFsPaths -> m (Ref (Run m h))
Run.newEmpty HasFS m h
hfs HasBlockIO m h
hbio RunParams
runParams RunFsPaths
runPaths
          MergeDebt
-> SpentCredits
-> MergeKnownCompleted
-> MergingRunState t m h
-> m (Ref (MergingRun t m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
MergeDebt
-> SpentCredits
-> MergeKnownCompleted
-> MergingRunState t m h
-> m (Ref (MergingRun t m h))
unsafeNew
            (MergeCredits -> MergeDebt
MergeDebt MergeCredits
0)
            (MergeCredits -> SpentCredits
SpentCredits MergeCredits
0)
            MergeKnownCompleted
MergeKnownCompleted
            (Ref (Run m h) -> MergingRunState t m h
forall t (m :: * -> *) h. Ref (Run m h) -> MergingRunState t m h
CompletedMerge Ref (Run m h)
r)
        Int
_ -> do
          Vector (Ref (Run m h))
rs <- (Ref (Run m h) -> m (Ref (Run m h)))
-> Vector (Ref (Run m h)) -> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM Ref (Run m h) -> m (Ref (Run m h))
dupRun Vector (Ref (Run m h))
nonEmptyRuns
          Merge t m h
merge <- Merge t m h -> Maybe (Merge t m h) -> Merge t m h
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> Merge t m h
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"newMerge: merges can not be empty")
            (Maybe (Merge t m h) -> Merge t m h)
-> m (Maybe (Merge t m h)) -> m (Merge t m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HasFS m h
-> HasBlockIO m h
-> RunParams
-> t
-> ResolveSerialisedValue
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Maybe (Merge t m h))
forall t (m :: * -> *) h.
(IsMergeType t, MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> t
-> ResolveSerialisedValue
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Maybe (Merge t m h))
Merge.new HasFS m h
hfs HasBlockIO m h
hbio RunParams
runParams t
ty ResolveSerialisedValue
resolve RunFsPaths
runPaths Vector (Ref (Run m h))
rs
          MergeDebt
-> SpentCredits
-> MergeKnownCompleted
-> MergingRunState t m h
-> m (Ref (MergingRun t m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
MergeDebt
-> SpentCredits
-> MergeKnownCompleted
-> MergingRunState t m h
-> m (Ref (MergingRun t m h))
unsafeNew
            (NumEntries -> MergeDebt
numEntriesToMergeDebt ((Ref (Run m h) -> NumEntries)
-> Vector (Ref (Run m h)) -> NumEntries
forall m a. Monoid m => (a -> m) -> Vector a -> m
V.foldMap' Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Vector (Ref (Run m h))
rs))
            (MergeCredits -> SpentCredits
SpentCredits MergeCredits
0)
            MergeKnownCompleted
MergeMaybeCompleted
            (Vector (Ref (Run m h)) -> Merge t m h -> MergingRunState t m h
forall t (m :: * -> *) h.
Vector (Ref (Run m h)) -> Merge t m h -> MergingRunState t m h
OngoingMerge Vector (Ref (Run m h))
rs Merge t m h
merge)

{-# SPECIALISE newCompleted ::
     MergeDebt
  -> Ref (Run IO h)
  -> IO (Ref (MergingRun t IO h)) #-}
-- | Create a merging run that is already in the completed state, returning a
-- reference that must ultimately be released via 'releaseRef'.
--
-- Duplicates the supplied reference to the run.
--
-- This function should be run with asynchronous exceptions masked to prevent
-- failing after internal resources have already been created.
newCompleted ::
     (MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
  => MergeDebt -- ^ Since there are no longer any input runs, we need to be
               -- told what the merge debt was.
  -> Ref (Run m h)
  -> m (Ref (MergingRun t m h))
newCompleted :: forall (m :: * -> *) h t.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
MergeDebt -> Ref (Run m h) -> m (Ref (MergingRun t m h))
newCompleted MergeDebt
mergeDebt Ref (Run m h)
inputRun = do
    m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> (Ref (Run m h) -> m (Ref (MergingRun t m h)))
-> m (Ref (MergingRun t m h))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadCatch m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m,
 ?callStack::CallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
inputRun) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m,
 ?callStack::CallStack) =>
Ref obj -> m ()
releaseRef ((Ref (Run m h) -> m (Ref (MergingRun t m h)))
 -> m (Ref (MergingRun t m h)))
-> (Ref (Run m h) -> m (Ref (MergingRun t m h)))
-> m (Ref (MergingRun t m h))
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
run ->
      MergeDebt
-> SpentCredits
-> MergeKnownCompleted
-> MergingRunState t m h
-> m (Ref (MergingRun t m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
MergeDebt
-> SpentCredits
-> MergeKnownCompleted
-> MergingRunState t m h
-> m (Ref (MergingRun t m h))
unsafeNew
        MergeDebt
mergeDebt
        (MergeCredits -> SpentCredits
SpentCredits (MergeDebt -> MergeCredits
mergeDebtAsCredits MergeDebt
mergeDebt)) -- since it is completed
        MergeKnownCompleted
MergeKnownCompleted
        (Ref (Run m h) -> MergingRunState t m h
forall t (m :: * -> *) h. Ref (Run m h) -> MergingRunState t m h
CompletedMerge Ref (Run m h)
run)

-- | The table contains a run that has more than \(2^{40}\) physical entries.
data TableTooLargeError
    = ErrTableTooLarge
    deriving stock (Int -> TableTooLargeError -> ShowS
[TableTooLargeError] -> ShowS
TableTooLargeError -> [Char]
(Int -> TableTooLargeError -> ShowS)
-> (TableTooLargeError -> [Char])
-> ([TableTooLargeError] -> ShowS)
-> Show TableTooLargeError
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TableTooLargeError -> ShowS
showsPrec :: Int -> TableTooLargeError -> ShowS
$cshow :: TableTooLargeError -> [Char]
show :: TableTooLargeError -> [Char]
$cshowList :: [TableTooLargeError] -> ShowS
showList :: [TableTooLargeError] -> ShowS
Show, TableTooLargeError -> TableTooLargeError -> Bool
(TableTooLargeError -> TableTooLargeError -> Bool)
-> (TableTooLargeError -> TableTooLargeError -> Bool)
-> Eq TableTooLargeError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TableTooLargeError -> TableTooLargeError -> Bool
== :: TableTooLargeError -> TableTooLargeError -> Bool
$c/= :: TableTooLargeError -> TableTooLargeError -> Bool
/= :: TableTooLargeError -> TableTooLargeError -> Bool
Eq)
    deriving anyclass (Show TableTooLargeError
Typeable TableTooLargeError
(Typeable TableTooLargeError, Show TableTooLargeError) =>
(TableTooLargeError -> SomeException)
-> (SomeException -> Maybe TableTooLargeError)
-> (TableTooLargeError -> [Char])
-> Exception TableTooLargeError
SomeException -> Maybe TableTooLargeError
TableTooLargeError -> [Char]
TableTooLargeError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> [Char]) -> Exception e
$ctoException :: TableTooLargeError -> SomeException
toException :: TableTooLargeError -> SomeException
$cfromException :: SomeException -> Maybe TableTooLargeError
fromException :: SomeException -> Maybe TableTooLargeError
$cdisplayException :: TableTooLargeError -> [Char]
displayException :: TableTooLargeError -> [Char]
Exception)

{-# INLINE unsafeNew #-}
unsafeNew ::
     (MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
  => MergeDebt
  -> SpentCredits
  -> MergeKnownCompleted
  -> MergingRunState t m h
  -> m (Ref (MergingRun t m h))
unsafeNew :: forall (m :: * -> *) t h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
MergeDebt
-> SpentCredits
-> MergeKnownCompleted
-> MergingRunState t m h
-> m (Ref (MergingRun t m h))
unsafeNew (MergeDebt MergeCredits
mergeDebt) SpentCredits
_ MergeKnownCompleted
_ MergingRunState t m h
_
  | MergeCredits -> SpentCredits
SpentCredits MergeCredits
mergeDebt SpentCredits -> SpentCredits -> Bool
forall a. Ord a => a -> a -> Bool
> SpentCredits
forall a. Bounded a => a
maxBound
  = TableTooLargeError -> m (Ref (MergingRun t m h))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TableTooLargeError
ErrTableTooLarge

unsafeNew MergeDebt
mergeDebt (SpentCredits MergeCredits
spentCredits)
          MergeKnownCompleted
knownCompleted MergingRunState t m h
state = do
    let !credits :: Int
credits = SpentCredits -> UnspentCredits -> Int
CreditsPair (MergeCredits -> SpentCredits
SpentCredits MergeCredits
spentCredits) (MergeCredits -> UnspentCredits
UnspentCredits MergeCredits
0)
    CreditsVar (PrimState m)
mergeCreditsVar <- PrimVar (PrimState m) Int -> CreditsVar (PrimState m)
forall s. PrimVar s Int -> CreditsVar s
CreditsVar (PrimVar (PrimState m) Int -> CreditsVar (PrimState m))
-> m (PrimVar (PrimState m) Int) -> m (CreditsVar (PrimState m))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> m (PrimVar (PrimState m) Int)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
a -> m (PrimVar (PrimState m) a)
newPrimVar Int
credits
    MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted <- MergeKnownCompleted -> m (MutVar (PrimState m) MergeKnownCompleted)
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar MergeKnownCompleted
knownCompleted
    StrictMVar m (MergingRunState t m h)
mergeState <- MergingRunState t m h -> m (StrictMVar m (MergingRunState t m h))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar (MergingRunState t m h -> m (StrictMVar m (MergingRunState t m h)))
-> MergingRunState t m h
-> m (StrictMVar m (MergingRunState t m h))
forall a b. (a -> b) -> a -> b
$! MergingRunState t m h
state
    m ()
-> (RefCounter m -> MergingRun t m h) -> m (Ref (MergingRun t m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, ?callStack::CallStack) =>
m () -> (RefCounter m -> obj) -> m (Ref obj)
newRef (StrictMVar m (MergingRunState t m h) -> m ()
forall {m :: * -> *} {t} {h}.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
StrictMVar m (MergingRunState t m h) -> m ()
finalise StrictMVar m (MergingRunState t m h)
mergeState) ((RefCounter m -> MergingRun t m h) -> m (Ref (MergingRun t m h)))
-> (RefCounter m -> MergingRun t m h) -> m (Ref (MergingRun t m h))
forall a b. (a -> b) -> a -> b
$ \RefCounter m
mergeRefCounter ->
      MergingRun {
        MergeDebt
mergeDebt :: MergeDebt
mergeDebt :: MergeDebt
mergeDebt
      , CreditsVar (PrimState m)
mergeCreditsVar :: CreditsVar (PrimState m)
mergeCreditsVar :: CreditsVar (PrimState m)
mergeCreditsVar
      , MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted :: MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted :: MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted
      , StrictMVar m (MergingRunState t m h)
mergeState :: StrictMVar m (MergingRunState t m h)
mergeState :: StrictMVar m (MergingRunState t m h)
mergeState
      , RefCounter m
mergeRefCounter :: RefCounter m
mergeRefCounter :: RefCounter m
mergeRefCounter
      }
  where
    finalise :: StrictMVar m (MergingRunState t m h) -> m ()
finalise StrictMVar m (MergingRunState t m h)
var = StrictMVar m (MergingRunState t m h)
-> (MergingRunState t m h -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m b) -> m b
withMVar StrictMVar m (MergingRunState t m h)
var ((MergingRunState t m h -> m ()) -> m ())
-> (MergingRunState t m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \case
        CompletedMerge Ref (Run m h)
r ->
          Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m,
 ?callStack::CallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r
        OngoingMerge Vector (Ref (Run m h))
rs Merge t m h
m -> do
          -- The RunReaders in the Merge keep their own file handles to the
          -- run kopsFile open. We must close these handles *before* we release
          -- the runs themselves, which will close and delete the files.
          -- Otherwise we would be removing files that still have open handles
          -- (which does not work on Windows, and is caught by the MockFS).
          Merge t m h -> m ()
forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> m ()
Merge.abort Merge t m h
m
          Vector (Ref (Run m h)) -> (Ref (Run m h) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (Ref (Run m h))
rs Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m,
 ?callStack::CallStack) =>
Ref obj -> m ()
releaseRef

-- | Create references to the runs that should be queried for lookups.
-- In particular, if the merge is not complete, these are the input runs.
--
-- TODO: This interface doesn't work well with the action registry. Just doing
-- @withRollback reg (duplicateRuns mr) (mapM_ releaseRef)@ isn't exception-safe
-- since if one of the @releaseRef@ calls fails, the following ones aren't run.
{-# SPECIALISE duplicateRuns ::
     Ref (MergingRun t IO h) -> IO (V.Vector (Ref (Run IO h))) #-}
duplicateRuns ::
     (PrimMonad m, MonadMVar m, MonadMask m)
  => Ref (MergingRun t m h)
  -> m (V.Vector (Ref (Run m h)))
duplicateRuns :: forall (m :: * -> *) t h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Vector (Ref (Run m h)))
duplicateRuns (DeRef MergingRun t m h
mr) =
    -- We take the references while holding the MVar to make sure the MergingRun
    -- does not get completed concurrently before we are done.
    StrictMVar m (MergingRunState t m h)
-> (MergingRunState t m h -> m (Vector (Ref (Run m h))))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m b) -> m b
withMVar (MergingRun t m h -> StrictMVar m (MergingRunState t m h)
forall t (m :: * -> *) h.
MergingRun t m h -> StrictMVar m (MergingRunState t m h)
mergeState MergingRun t m h
mr) ((MergingRunState t m h -> m (Vector (Ref (Run m h))))
 -> m (Vector (Ref (Run m h))))
-> (MergingRunState t m h -> m (Vector (Ref (Run m h))))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \case
      CompletedMerge Ref (Run m h)
r  -> Ref (Run m h) -> Vector (Ref (Run m h))
forall a. a -> Vector a
V.singleton (Ref (Run m h) -> Vector (Ref (Run m h)))
-> m (Ref (Run m h)) -> m (Vector (Ref (Run m h)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m,
 ?callStack::CallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r
      OngoingMerge Vector (Ref (Run m h))
rs Merge t m h
_ -> (ActionRegistry m -> m (Vector (Ref (Run m h))))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Vector (Ref (Run m h))))
 -> m (Vector (Ref (Run m h))))
-> (ActionRegistry m -> m (Vector (Ref (Run m h))))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg ->
        (Ref (Run m h) -> m (Ref (Run m h)))
-> Vector (Ref (Run m h)) -> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM (\Ref (Run m h)
r -> ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, ?callStack::CallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m,
 ?callStack::CallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m,
 ?callStack::CallStack) =>
Ref obj -> m ()
releaseRef) Vector (Ref (Run m h))
rs

-- | Take a snapshot of the state of a merging run.
--
-- TODO: this is not concurrency safe! The inputs runs to the merging run could
-- be released concurrently by another thread that completes the merge, while
-- the snapshot is taking place. The solution is for snapshot here to duplicate
-- the runs it returns _while_ holding the mergeState MVar (to exclude threads
-- that might concurrently complete the merge). And then the caller of course
-- must be updated to release the extra references.
--
{-# SPECIALISE snapshot ::
     Ref (MergingRun t IO h)
  -> IO (MergeDebt,
         MergeCredits,
         MergingRunState t IO h) #-}
snapshot ::
     (PrimMonad m, MonadMVar m)
  => Ref (MergingRun t m h)
  -> m (MergeDebt,
        MergeCredits,
        MergingRunState t m h)
snapshot :: forall (m :: * -> *) t h.
(PrimMonad m, MonadMVar m) =>
Ref (MergingRun t m h)
-> m (MergeDebt, MergeCredits, MergingRunState t m h)
snapshot (DeRef MergingRun {RefCounter m
MutVar (PrimState m) MergeKnownCompleted
StrictMVar m (MergingRunState t m h)
CreditsVar (PrimState m)
MergeDebt
mergeState :: forall t (m :: * -> *) h.
MergingRun t m h -> StrictMVar m (MergingRunState t m h)
mergeDebt :: forall t (m :: * -> *) h. MergingRun t m h -> MergeDebt
mergeCreditsVar :: forall t (m :: * -> *) h.
MergingRun t m h -> CreditsVar (PrimState m)
mergeKnownCompleted :: forall t (m :: * -> *) h.
MergingRun t m h -> MutVar (PrimState m) MergeKnownCompleted
mergeRefCounter :: forall t (m :: * -> *) h. MergingRun t m h -> RefCounter m
mergeDebt :: MergeDebt
mergeCreditsVar :: CreditsVar (PrimState m)
mergeKnownCompleted :: MutVar (PrimState m) MergeKnownCompleted
mergeState :: StrictMVar m (MergingRunState t m h)
mergeRefCounter :: RefCounter m
..}) = do
    MergingRunState t m h
state <- StrictMVar m (MergingRunState t m h) -> m (MergingRunState t m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar StrictMVar m (MergingRunState t m h)
mergeState
    (SpentCredits   MergeCredits
spent,
     UnspentCredits MergeCredits
unspent) <- CreditsVar (PrimState m) -> m (SpentCredits, UnspentCredits)
forall (m :: * -> *).
PrimMonad m =>
CreditsVar (PrimState m) -> m (SpentCredits, UnspentCredits)
atomicReadCredits CreditsVar (PrimState m)
mergeCreditsVar
    let supplied :: MergeCredits
supplied = MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent
    (MergeDebt, MergeCredits, MergingRunState t m h)
-> m (MergeDebt, MergeCredits, MergingRunState t m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergeDebt
mergeDebt, MergeCredits
supplied, MergingRunState t m h
state)

totalMergeDebt :: Ref (MergingRun t m h) -> MergeDebt
totalMergeDebt :: forall t (m :: * -> *) h. Ref (MergingRun t m h) -> MergeDebt
totalMergeDebt (DeRef MergingRun {MergeDebt
mergeDebt :: forall t (m :: * -> *) h. MergingRun t m h -> MergeDebt
mergeDebt :: MergeDebt
mergeDebt}) = MergeDebt
mergeDebt

{-# INLINE mergeType #-}
mergeType :: MonadMVar m => Ref (MergingRun t m h) -> m (Maybe t)
mergeType :: forall (m :: * -> *) t h.
MonadMVar m =>
Ref (MergingRun t m h) -> m (Maybe t)
mergeType (DeRef MergingRun t m h
mr) = do
    MergingRunState t m h
s <- StrictMVar m (MergingRunState t m h) -> m (MergingRunState t m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar (MergingRun t m h -> StrictMVar m (MergingRunState t m h)
forall t (m :: * -> *) h.
MergingRun t m h -> StrictMVar m (MergingRunState t m h)
mergeState MergingRun t m h
mr)
    Maybe t -> m (Maybe t)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe t -> m (Maybe t)) -> Maybe t -> m (Maybe t)
forall a b. (a -> b) -> a -> b
$ case MergingRunState t m h
s of
      CompletedMerge Ref (Run m h)
_ -> Maybe t
forall a. Maybe a
Nothing
      OngoingMerge Vector (Ref (Run m h))
_ Merge t m h
m -> t -> Maybe t
forall a. a -> Maybe a
Just (Merge t m h -> t
forall t (m :: * -> *) h. Merge t m h -> t
Merge.mergeType Merge t m h
m)

{-------------------------------------------------------------------------------
  Credits
-------------------------------------------------------------------------------}

{- $credittracking

The credits and debt concept we use here comes from amortised analysis of data
structures (see the Bankers Method from Okasaki). Though here we use it not as
an analysis method but within the code itself for tracking the state of the
scheduled (i.e. incremental) merge.

There are two notions of credits (and corresponding debt) in this LSM
implementation: nominal credits and merge credits. The merging run deals
exclusively with merge credits. See 'IncomingRun' for nominal credits.

A single merge credit corresponds with a merge step performed. Merge steps are
measured by the number of entries in the input runs that are consumed. We
measure the merge in terms of inputs, not outputs, because the number of inputs
is known beforehand, whereas the number of outputs is not. The total merge debt
is therefore defined to be the sum of the number of entries across the input
runs to the merge. Once the merge credits spent equals the merge debt then the
merge is (or rather must be) complete.

In both the prototype and implementation we accumulate unspent credits until
they reach a threshold at which point we do a batch of merging work. We track
both credits spent and credits as yet unspent.

In the prototype, the credits spent equals the merge steps performed. The
same holds in the real implementation, but making it so is more complicated.
When we spend credits on merging work, the number of steps we perform is not
guaranteed to be the same as the credits supplied. For example we may ask to do
100 credits of merging work, but the merge code (for perfectly sensible
efficiency reasons) will decide to do 102 units of merging work. The rule is
that we may do (slightly) more work than the credits supplied but not less.
To account for this we spend more credits, corresponding to the excess merging
work performed. We spend them by borrowing them from the unspent credits, which
may leave the unspent credits with a negative balance.

Furthermore, the real implementation has to cope with concurrency: multiple
threads sharing the same 'MergingRun' and calling 'supplyCredits' concurrently.
The credit accounting thus needs to define the state of the credits while
merging work is in progress by some thread. The approach we take is to define
spent credit to include credits that are in the process of being spent,
leaving unspent credit as credits that are available for a thread to spend on
merging work.

Thus we track three things:

 * spent credits ('SpentCredits'): credits supplied that have been or are in
   the process of being spent on performing merging steps;

 * unspent credits ('UnspentCredits'): credits supplied that are not yet spent
   and are thus available to spend; and

 * merge debt ('MergeDebt'): the sum of the sizes of the input runs, and thus
   the total merge credits that have to be spent for the merge to be complete.

And define a derived measure:

 * supplied credits: the sum of the spent and unspent credits. This is
   therefore also the sum of all the credits that have been (successfully)
   supplied to a merging run via 'supplyCredits'.

   The supplied credits increases monotonically, even in the presence of
   (a)synchronous exceptions.

   We guarantee that the supplied credits never exceeds the total debt.

When the supplied credits equals the merge debt then we may not have actually
completed the merge (since that requires spending the credits) but we have the
potential to complete the merge whenever needed without supplying any more
credits.

The credits spent and the steps performed (or in the process of being
performed) will typically be equal. They are not guaranteed to be equal in the
presence of exceptions (synchronous or asynchronous). In this case we offer a
weaker guarantee: : a merge /may/ progress more steps than the number of
credits that were spent. If an exception happens at some point during merging
work, we will \"unspend\" all the credits we intended to spend, but we will not
revert all merging steps that we already successfully performed before the
exception. Thus we may do more merging steps than the credits we accounted as
spent. This makes the implementation simple, and merges will still finish in
time. It would be bad if we did not put back credits, because then a merge
might not finish in time, which will mess up the shape of the levels tree.
-}

newtype MergeCredits = MergeCredits Int
  deriving stock (MergeCredits -> MergeCredits -> Bool
(MergeCredits -> MergeCredits -> Bool)
-> (MergeCredits -> MergeCredits -> Bool) -> Eq MergeCredits
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MergeCredits -> MergeCredits -> Bool
== :: MergeCredits -> MergeCredits -> Bool
$c/= :: MergeCredits -> MergeCredits -> Bool
/= :: MergeCredits -> MergeCredits -> Bool
Eq, Eq MergeCredits
Eq MergeCredits =>
(MergeCredits -> MergeCredits -> Ordering)
-> (MergeCredits -> MergeCredits -> Bool)
-> (MergeCredits -> MergeCredits -> Bool)
-> (MergeCredits -> MergeCredits -> Bool)
-> (MergeCredits -> MergeCredits -> Bool)
-> (MergeCredits -> MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits -> MergeCredits)
-> Ord MergeCredits
MergeCredits -> MergeCredits -> Bool
MergeCredits -> MergeCredits -> Ordering
MergeCredits -> MergeCredits -> MergeCredits
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: MergeCredits -> MergeCredits -> Ordering
compare :: MergeCredits -> MergeCredits -> Ordering
$c< :: MergeCredits -> MergeCredits -> Bool
< :: MergeCredits -> MergeCredits -> Bool
$c<= :: MergeCredits -> MergeCredits -> Bool
<= :: MergeCredits -> MergeCredits -> Bool
$c> :: MergeCredits -> MergeCredits -> Bool
> :: MergeCredits -> MergeCredits -> Bool
$c>= :: MergeCredits -> MergeCredits -> Bool
>= :: MergeCredits -> MergeCredits -> Bool
$cmax :: MergeCredits -> MergeCredits -> MergeCredits
max :: MergeCredits -> MergeCredits -> MergeCredits
$cmin :: MergeCredits -> MergeCredits -> MergeCredits
min :: MergeCredits -> MergeCredits -> MergeCredits
Ord, Int -> MergeCredits -> ShowS
[MergeCredits] -> ShowS
MergeCredits -> [Char]
(Int -> MergeCredits -> ShowS)
-> (MergeCredits -> [Char])
-> ([MergeCredits] -> ShowS)
-> Show MergeCredits
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MergeCredits -> ShowS
showsPrec :: Int -> MergeCredits -> ShowS
$cshow :: MergeCredits -> [Char]
show :: MergeCredits -> [Char]
$cshowList :: [MergeCredits] -> ShowS
showList :: [MergeCredits] -> ShowS
Show)
  deriving newtype (Integer -> MergeCredits
MergeCredits -> MergeCredits
MergeCredits -> MergeCredits -> MergeCredits
(MergeCredits -> MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits)
-> (Integer -> MergeCredits)
-> Num MergeCredits
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: MergeCredits -> MergeCredits -> MergeCredits
+ :: MergeCredits -> MergeCredits -> MergeCredits
$c- :: MergeCredits -> MergeCredits -> MergeCredits
- :: MergeCredits -> MergeCredits -> MergeCredits
$c* :: MergeCredits -> MergeCredits -> MergeCredits
* :: MergeCredits -> MergeCredits -> MergeCredits
$cnegate :: MergeCredits -> MergeCredits
negate :: MergeCredits -> MergeCredits
$cabs :: MergeCredits -> MergeCredits
abs :: MergeCredits -> MergeCredits
$csignum :: MergeCredits -> MergeCredits
signum :: MergeCredits -> MergeCredits
$cfromInteger :: Integer -> MergeCredits
fromInteger :: Integer -> MergeCredits
Num, Num MergeCredits
Ord MergeCredits
(Num MergeCredits, Ord MergeCredits) =>
(MergeCredits -> Rational) -> Real MergeCredits
MergeCredits -> Rational
forall a. (Num a, Ord a) => (a -> Rational) -> Real a
$ctoRational :: MergeCredits -> Rational
toRational :: MergeCredits -> Rational
Real, Int -> MergeCredits
MergeCredits -> Int
MergeCredits -> [MergeCredits]
MergeCredits -> MergeCredits
MergeCredits -> MergeCredits -> [MergeCredits]
MergeCredits -> MergeCredits -> MergeCredits -> [MergeCredits]
(MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits)
-> (Int -> MergeCredits)
-> (MergeCredits -> Int)
-> (MergeCredits -> [MergeCredits])
-> (MergeCredits -> MergeCredits -> [MergeCredits])
-> (MergeCredits -> MergeCredits -> [MergeCredits])
-> (MergeCredits -> MergeCredits -> MergeCredits -> [MergeCredits])
-> Enum MergeCredits
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: MergeCredits -> MergeCredits
succ :: MergeCredits -> MergeCredits
$cpred :: MergeCredits -> MergeCredits
pred :: MergeCredits -> MergeCredits
$ctoEnum :: Int -> MergeCredits
toEnum :: Int -> MergeCredits
$cfromEnum :: MergeCredits -> Int
fromEnum :: MergeCredits -> Int
$cenumFrom :: MergeCredits -> [MergeCredits]
enumFrom :: MergeCredits -> [MergeCredits]
$cenumFromThen :: MergeCredits -> MergeCredits -> [MergeCredits]
enumFromThen :: MergeCredits -> MergeCredits -> [MergeCredits]
$cenumFromTo :: MergeCredits -> MergeCredits -> [MergeCredits]
enumFromTo :: MergeCredits -> MergeCredits -> [MergeCredits]
$cenumFromThenTo :: MergeCredits -> MergeCredits -> MergeCredits -> [MergeCredits]
enumFromThenTo :: MergeCredits -> MergeCredits -> MergeCredits -> [MergeCredits]
Enum, Enum MergeCredits
Real MergeCredits
(Real MergeCredits, Enum MergeCredits) =>
(MergeCredits -> MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits -> MergeCredits)
-> (MergeCredits -> MergeCredits -> (MergeCredits, MergeCredits))
-> (MergeCredits -> MergeCredits -> (MergeCredits, MergeCredits))
-> (MergeCredits -> Integer)
-> Integral MergeCredits
MergeCredits -> Integer
MergeCredits -> MergeCredits -> (MergeCredits, MergeCredits)
MergeCredits -> MergeCredits -> MergeCredits
forall a.
(Real a, Enum a) =>
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
$cquot :: MergeCredits -> MergeCredits -> MergeCredits
quot :: MergeCredits -> MergeCredits -> MergeCredits
$crem :: MergeCredits -> MergeCredits -> MergeCredits
rem :: MergeCredits -> MergeCredits -> MergeCredits
$cdiv :: MergeCredits -> MergeCredits -> MergeCredits
div :: MergeCredits -> MergeCredits -> MergeCredits
$cmod :: MergeCredits -> MergeCredits -> MergeCredits
mod :: MergeCredits -> MergeCredits -> MergeCredits
$cquotRem :: MergeCredits -> MergeCredits -> (MergeCredits, MergeCredits)
quotRem :: MergeCredits -> MergeCredits -> (MergeCredits, MergeCredits)
$cdivMod :: MergeCredits -> MergeCredits -> (MergeCredits, MergeCredits)
divMod :: MergeCredits -> MergeCredits -> (MergeCredits, MergeCredits)
$ctoInteger :: MergeCredits -> Integer
toInteger :: MergeCredits -> Integer
Integral, MergeCredits -> ()
(MergeCredits -> ()) -> NFData MergeCredits
forall a. (a -> ()) -> NFData a
$crnf :: MergeCredits -> ()
rnf :: MergeCredits -> ()
NFData)

newtype MergeDebt = MergeDebt MergeCredits
  deriving stock (MergeDebt -> MergeDebt -> Bool
(MergeDebt -> MergeDebt -> Bool)
-> (MergeDebt -> MergeDebt -> Bool) -> Eq MergeDebt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MergeDebt -> MergeDebt -> Bool
== :: MergeDebt -> MergeDebt -> Bool
$c/= :: MergeDebt -> MergeDebt -> Bool
/= :: MergeDebt -> MergeDebt -> Bool
Eq, Eq MergeDebt
Eq MergeDebt =>
(MergeDebt -> MergeDebt -> Ordering)
-> (MergeDebt -> MergeDebt -> Bool)
-> (MergeDebt -> MergeDebt -> Bool)
-> (MergeDebt -> MergeDebt -> Bool)
-> (MergeDebt -> MergeDebt -> Bool)
-> (MergeDebt -> MergeDebt -> MergeDebt)
-> (MergeDebt -> MergeDebt -> MergeDebt)
-> Ord MergeDebt
MergeDebt -> MergeDebt -> Bool
MergeDebt -> MergeDebt -> Ordering
MergeDebt -> MergeDebt -> MergeDebt
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: MergeDebt -> MergeDebt -> Ordering
compare :: MergeDebt -> MergeDebt -> Ordering
$c< :: MergeDebt -> MergeDebt -> Bool
< :: MergeDebt -> MergeDebt -> Bool
$c<= :: MergeDebt -> MergeDebt -> Bool
<= :: MergeDebt -> MergeDebt -> Bool
$c> :: MergeDebt -> MergeDebt -> Bool
> :: MergeDebt -> MergeDebt -> Bool
$c>= :: MergeDebt -> MergeDebt -> Bool
>= :: MergeDebt -> MergeDebt -> Bool
$cmax :: MergeDebt -> MergeDebt -> MergeDebt
max :: MergeDebt -> MergeDebt -> MergeDebt
$cmin :: MergeDebt -> MergeDebt -> MergeDebt
min :: MergeDebt -> MergeDebt -> MergeDebt
Ord)
  deriving newtype (MergeDebt -> ()
(MergeDebt -> ()) -> NFData MergeDebt
forall a. (a -> ()) -> NFData a
$crnf :: MergeDebt -> ()
rnf :: MergeDebt -> ()
NFData)

mergeDebtAsCredits :: MergeDebt -> MergeCredits
mergeDebtAsCredits :: MergeDebt -> MergeCredits
mergeDebtAsCredits (MergeDebt MergeCredits
c) = MergeCredits
c

{-# INLINE numEntriesToMergeDebt #-}
-- | The total debt of the merging run is exactly the sum total number of
-- entries across all the input runs to be merged.
--
numEntriesToMergeDebt :: NumEntries -> MergeDebt
numEntriesToMergeDebt :: NumEntries -> MergeDebt
numEntriesToMergeDebt (NumEntries Int
n) = MergeCredits -> MergeDebt
MergeDebt (Int -> MergeCredits
MergeCredits Int
n)

-- | Unspent credits are accumulated until they go over the 'CreditThreshold',
-- after which a batch of merge work will be performed. Configuring this
-- threshold should allow to achieve a nice balance between spreading out
-- I\/O and achieving good (concurrent) performance.
--
-- Note that ideally the batch size for different LSM levels should be
-- co-prime so that merge work at different levels is not synchronised.
--
newtype CreditThreshold = CreditThreshold UnspentCredits
  deriving stock Int -> CreditThreshold -> ShowS
[CreditThreshold] -> ShowS
CreditThreshold -> [Char]
(Int -> CreditThreshold -> ShowS)
-> (CreditThreshold -> [Char])
-> ([CreditThreshold] -> ShowS)
-> Show CreditThreshold
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CreditThreshold -> ShowS
showsPrec :: Int -> CreditThreshold -> ShowS
$cshow :: CreditThreshold -> [Char]
show :: CreditThreshold -> [Char]
$cshowList :: [CreditThreshold] -> ShowS
showList :: [CreditThreshold] -> ShowS
Show

-- | The spent credits are supplied credits that have been spent on performing
-- merging steps plus the supplied credits that are in the process of being
-- spent (by some thread calling 'supplyCredits').
--
newtype SpentCredits = SpentCredits MergeCredits
  deriving newtype (SpentCredits -> SpentCredits -> Bool
(SpentCredits -> SpentCredits -> Bool)
-> (SpentCredits -> SpentCredits -> Bool) -> Eq SpentCredits
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SpentCredits -> SpentCredits -> Bool
== :: SpentCredits -> SpentCredits -> Bool
$c/= :: SpentCredits -> SpentCredits -> Bool
/= :: SpentCredits -> SpentCredits -> Bool
Eq, Eq SpentCredits
Eq SpentCredits =>
(SpentCredits -> SpentCredits -> Ordering)
-> (SpentCredits -> SpentCredits -> Bool)
-> (SpentCredits -> SpentCredits -> Bool)
-> (SpentCredits -> SpentCredits -> Bool)
-> (SpentCredits -> SpentCredits -> Bool)
-> (SpentCredits -> SpentCredits -> SpentCredits)
-> (SpentCredits -> SpentCredits -> SpentCredits)
-> Ord SpentCredits
SpentCredits -> SpentCredits -> Bool
SpentCredits -> SpentCredits -> Ordering
SpentCredits -> SpentCredits -> SpentCredits
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: SpentCredits -> SpentCredits -> Ordering
compare :: SpentCredits -> SpentCredits -> Ordering
$c< :: SpentCredits -> SpentCredits -> Bool
< :: SpentCredits -> SpentCredits -> Bool
$c<= :: SpentCredits -> SpentCredits -> Bool
<= :: SpentCredits -> SpentCredits -> Bool
$c> :: SpentCredits -> SpentCredits -> Bool
> :: SpentCredits -> SpentCredits -> Bool
$c>= :: SpentCredits -> SpentCredits -> Bool
>= :: SpentCredits -> SpentCredits -> Bool
$cmax :: SpentCredits -> SpentCredits -> SpentCredits
max :: SpentCredits -> SpentCredits -> SpentCredits
$cmin :: SpentCredits -> SpentCredits -> SpentCredits
min :: SpentCredits -> SpentCredits -> SpentCredits
Ord, Int -> SpentCredits -> ShowS
[SpentCredits] -> ShowS
SpentCredits -> [Char]
(Int -> SpentCredits -> ShowS)
-> (SpentCredits -> [Char])
-> ([SpentCredits] -> ShowS)
-> Show SpentCredits
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SpentCredits -> ShowS
showsPrec :: Int -> SpentCredits -> ShowS
$cshow :: SpentCredits -> [Char]
show :: SpentCredits -> [Char]
$cshowList :: [SpentCredits] -> ShowS
showList :: [SpentCredits] -> ShowS
Show)

-- | 40 bit unsigned number
instance Bounded SpentCredits where
    minBound :: SpentCredits
minBound = MergeCredits -> SpentCredits
SpentCredits MergeCredits
0
    maxBound :: SpentCredits
maxBound = MergeCredits -> SpentCredits
SpentCredits (Int -> MergeCredits
MergeCredits (Int
1 Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
40 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1))

-- | The unspent credits are supplied credits that have not yet been spent on
-- performing merging steps and are available to spend.
--
-- Note: unspent credits may be negative! This can occur when more merge
-- steps were performed than there were credits to cover. In this case the
-- credits are borrowed from the unspent credits, which may result in the
-- current unspent credits being negative for a time.
--
newtype UnspentCredits = UnspentCredits MergeCredits
  deriving newtype (UnspentCredits -> UnspentCredits -> Bool
(UnspentCredits -> UnspentCredits -> Bool)
-> (UnspentCredits -> UnspentCredits -> Bool) -> Eq UnspentCredits
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnspentCredits -> UnspentCredits -> Bool
== :: UnspentCredits -> UnspentCredits -> Bool
$c/= :: UnspentCredits -> UnspentCredits -> Bool
/= :: UnspentCredits -> UnspentCredits -> Bool
Eq, Eq UnspentCredits
Eq UnspentCredits =>
(UnspentCredits -> UnspentCredits -> Ordering)
-> (UnspentCredits -> UnspentCredits -> Bool)
-> (UnspentCredits -> UnspentCredits -> Bool)
-> (UnspentCredits -> UnspentCredits -> Bool)
-> (UnspentCredits -> UnspentCredits -> Bool)
-> (UnspentCredits -> UnspentCredits -> UnspentCredits)
-> (UnspentCredits -> UnspentCredits -> UnspentCredits)
-> Ord UnspentCredits
UnspentCredits -> UnspentCredits -> Bool
UnspentCredits -> UnspentCredits -> Ordering
UnspentCredits -> UnspentCredits -> UnspentCredits
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: UnspentCredits -> UnspentCredits -> Ordering
compare :: UnspentCredits -> UnspentCredits -> Ordering
$c< :: UnspentCredits -> UnspentCredits -> Bool
< :: UnspentCredits -> UnspentCredits -> Bool
$c<= :: UnspentCredits -> UnspentCredits -> Bool
<= :: UnspentCredits -> UnspentCredits -> Bool
$c> :: UnspentCredits -> UnspentCredits -> Bool
> :: UnspentCredits -> UnspentCredits -> Bool
$c>= :: UnspentCredits -> UnspentCredits -> Bool
>= :: UnspentCredits -> UnspentCredits -> Bool
$cmax :: UnspentCredits -> UnspentCredits -> UnspentCredits
max :: UnspentCredits -> UnspentCredits -> UnspentCredits
$cmin :: UnspentCredits -> UnspentCredits -> UnspentCredits
min :: UnspentCredits -> UnspentCredits -> UnspentCredits
Ord, Int -> UnspentCredits -> ShowS
[UnspentCredits] -> ShowS
UnspentCredits -> [Char]
(Int -> UnspentCredits -> ShowS)
-> (UnspentCredits -> [Char])
-> ([UnspentCredits] -> ShowS)
-> Show UnspentCredits
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnspentCredits -> ShowS
showsPrec :: Int -> UnspentCredits -> ShowS
$cshow :: UnspentCredits -> [Char]
show :: UnspentCredits -> [Char]
$cshowList :: [UnspentCredits] -> ShowS
showList :: [UnspentCredits] -> ShowS
Show)

-- | 24 bit signed number
instance Bounded UnspentCredits where
    minBound :: UnspentCredits
minBound = MergeCredits -> UnspentCredits
UnspentCredits (Int -> MergeCredits
MergeCredits ((-Int
1) Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
23))
    maxBound :: UnspentCredits
maxBound = MergeCredits -> UnspentCredits
UnspentCredits (Int -> MergeCredits
MergeCredits (  Int
1  Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
23 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1))

-- | This holds the pair of the 'SpentCredits' and the 'UnspentCredits'. All
-- operations on this pair are atomic.
--
-- The model to think about is a @TVar (SpentCredits, UnspentCredits)@ but the
-- physical representation is a single mutable unboxed 64bit signed @Int@,
-- using 40 bits for the spent credits and 24 for the unspent credits. The
-- spent credits are unsigned, while the unspent credits are signed, so 40 bits
-- and 23+1 bits respectively. This imposes a limit of just over 1 trillion for
-- the spent credits and thus run size, and 8.3 million for the unspent credits
-- (23 + sign bit).
--
-- If these limits ever become restrictive, then the implementation could be
-- changed to use a TVar or a double-word CAS (DWCAS, i.e. 128bit).
--
newtype CreditsVar s = CreditsVar (PrimVar s Int)

pattern CreditsPair :: SpentCredits -> UnspentCredits -> Int
pattern $mCreditsPair :: forall {r}.
Int -> (SpentCredits -> UnspentCredits -> r) -> ((# #) -> r) -> r
$bCreditsPair :: SpentCredits -> UnspentCredits -> Int
CreditsPair sc uc <- (unpackCreditsPair -> (sc, uc))
  where
    CreditsPair SpentCredits
sc UnspentCredits
uc = SpentCredits -> UnspentCredits -> Int
packCreditsPair SpentCredits
sc UnspentCredits
uc
#if MIN_VERSION_GLASGOW_HASKELL(9,2,0,0)
{-# INLINE CreditsPair #-}
#endif
{-# COMPLETE CreditsPair #-}

{-# INLINE packCreditsPair #-}
packCreditsPair :: SpentCredits -> UnspentCredits -> Int
packCreditsPair :: SpentCredits -> UnspentCredits -> Int
packCreditsPair spent :: SpentCredits
spent@(SpentCredits (MergeCredits Int
sc))
                unspent :: UnspentCredits
unspent@(UnspentCredits (MergeCredits Int
uc)) =
      Bool -> Int -> Int
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (SpentCredits
spent   SpentCredits -> SpentCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= SpentCredits
forall a. Bounded a => a
minBound Bool -> Bool -> Bool
&& SpentCredits
spent   SpentCredits -> SpentCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= SpentCredits
forall a. Bounded a => a
maxBound) (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$
      Bool -> Int -> Int
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (UnspentCredits
unspent UnspentCredits -> UnspentCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= UnspentCredits
forall a. Bounded a => a
minBound Bool -> Bool -> Bool
&& UnspentCredits
unspent UnspentCredits -> UnspentCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= UnspentCredits
forall a. Bounded a => a
maxBound) (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$

      Int
sc Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
24
  Int -> Int -> Int
forall a. Bits a => a -> a -> a
.|. (Int
uc Int -> Int -> Int
forall a. Bits a => a -> a -> a
.&. Int
0xffffff)

{-# INLINE unpackCreditsPair #-}
unpackCreditsPair :: Int -> (SpentCredits, UnspentCredits)
unpackCreditsPair :: Int -> (SpentCredits, UnspentCredits)
unpackCreditsPair Int
cp =
    -- we use unsigned shift for spent, and sign extending shift for unspent
    ( MergeCredits -> SpentCredits
SpentCredits   (Int -> MergeCredits
MergeCredits (Word -> Int
w2i (Int -> Word
i2w Int
cp Word -> Int -> Word
forall a. Bits a => a -> Int -> a
`unsafeShiftR` Int
24)))
    , MergeCredits -> UnspentCredits
UnspentCredits (Int -> MergeCredits
MergeCredits ((Int
cp Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
40) Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftR` Int
40))
    )
  where
    i2w :: Int -> Word
    w2i :: Word -> Int
    i2w :: Int -> Word
i2w = Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral
    w2i :: Word -> Int
w2i = Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral

{-------------------------------------------------------------------------------
  Credit transactions
-------------------------------------------------------------------------------}

{- $concurrency

Merging runs can be shared across tables, which means that multiple threads can
contribute to the same merge concurrently. The design to contribute credits to
the same merging run is largely lock-free. It ensures consistency of the
unspent credits and the merge state, while allowing threads to progress without
waiting on other threads.

The entry point for merging is 'supplyCredits'. This may be called by
concurrent threads that share the same merging run. No locks are held
initially.

The credits to supply can be specified as either an absolute or relative value.
That is, we can ask that the number of supplied credits be set to a value N, or
we can specify an additional N credits.
increasing so in the absolute case, there is no change if the requested new
supplied credit value is less than the current value. Supplying credits from
the levels (via incoming runs) uses absolute credits, while supplying credits
from merging trees using relative credits.

The main lock we will discuss is the 'mergeState' 'StrictMVar', and we will
refer to it as the merge lock.

We get the easy things out of the way first: the 'mergeKnownCompleted'
variable is purely an optimisation. It starts out as 'MergeMaybeCompleted'
and is only ever modified once to 'MergeKnownCompleted'. It is modified with
the merge lock held, but read without the lock. It does not matter if a thread
reads a stale value of 'MergeMaybeCompleted'. We can analyse the remainder of
the algorithm as if we were always in the 'MergeMaybeCompleted' state.

Variable access and locks:

* 'CreditsVar' contains the pair of the current 'SpentCredits' and
  'UnspentCredits'. Is only operated upon using transactions (atomic CAS),
  and most of these transactions are done without the merge lock held.
  The two constituent components can increase and decrease, but the total
  supplied credits (sum of spent and unspent) can only increase.

* 'MergeState' contains the state of the merge itself. It is protected by the
  merge lock.

First, we do a moderately complex transaction 'atomicDepositAndSpendCredits',
which does the following:

 * Deposit credits to the unspent pot, while guaranteeing that the total
   supplied credits does not exceed the total debt for the merging run.
 * Compute any leftover credits (that would have exceeded the total debt).
 * Compute the credits to spend on performing merge steps, depending on which
   of three cases we are in:

    1. we have supplied enough credits to complete the merge;
    2. not case 1, but enough unspent credits have accumulated to do a batch of
       merge work;
    3. not case 1 or 2, not enough credits to do any merge work.

  * Update the spent and unspent pots
  * Return the credits to spend now and any leftover credits.

If there are now credits to spend, then we attempt to perform that number of
merging steps. While doing the merging work, the (more expensive) merge lock is
taken to ensure that the merging work itself is performed only sequentially.

Note that it is not guaranteed that the merge gets completed, even if the
credits supplied has reached the total debt. It may be interrupted during the
merge (by an async exception). This does not matter because the merge will be
completed in 'expectCompleted'. Completing early is an optimisation.

If an exception occurs during the merge then the credits that were in the
process of being spent are transferred back from the spent to the unspent pot
using 'atomicSpendCredits' (with a negative amount). It is this case that
implies that the spent credits may not increase monotonically, even though the
supplied credits do increase monotonically.

Once performing merge steps is done, if it turns out that excess merge steps
were performed then we must do a further accounting transaction:
'atomicSpendCredits' to spend the excess credits. This is done without respect
to the balance of the unspent credits, which may result in the unspent credit
balance becoming negative. This is ok, and will result in more credits having
to be supplied next time before reaching the credit batch threshold. The
unspent credits can not be negative by the time the merge is complete because
the performing of merge steps cannot do excess steps when it reaches the end of
the merge.

-}

{-# INLINE atomicReadCredits #-}
atomicReadCredits ::
     PrimMonad m
  => CreditsVar (PrimState m)
  -> m (SpentCredits, UnspentCredits)
atomicReadCredits :: forall (m :: * -> *).
PrimMonad m =>
CreditsVar (PrimState m) -> m (SpentCredits, UnspentCredits)
atomicReadCredits (CreditsVar PrimVar (PrimState m) Int
v) =
    Int -> (SpentCredits, UnspentCredits)
unpackCreditsPair (Int -> (SpentCredits, UnspentCredits))
-> m Int -> m (SpentCredits, UnspentCredits)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> PrimVar (PrimState m) Int -> m Int
forall (m :: * -> *).
PrimMonad m =>
PrimVar (PrimState m) Int -> m Int
atomicReadInt PrimVar (PrimState m) Int
v

{-# INLINE atomicModifyInt #-}
-- | Atomically modify a single mutable integer variable, using a CAS loop.
atomicModifyInt ::
     PrimMonad m
  => PrimVar (PrimState m) Int
  -> (Int -> (Int, a))
  -> m a
atomicModifyInt :: forall (m :: * -> *) a.
PrimMonad m =>
PrimVar (PrimState m) Int -> (Int -> (Int, a)) -> m a
atomicModifyInt PrimVar (PrimState m) Int
var Int -> (Int, a)
f =
    PrimVar (PrimState m) Int -> m Int
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> m a
readPrimVar PrimVar (PrimState m) Int
var m Int -> (Int -> m a) -> m a
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> m a
casLoop
  where
    casLoop :: Int -> m a
casLoop !Int
before = do
      let (!Int
after, !a
result) = Int -> (Int, a)
f Int
before
      Int
before' <- PrimVar (PrimState m) Int -> Int -> Int -> m Int
forall (m :: * -> *).
PrimMonad m =>
PrimVar (PrimState m) Int -> Int -> Int -> m Int
casInt PrimVar (PrimState m) Int
var Int
before Int
after
      if Int
before' Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
before
        then a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
result
        else Int -> m a
casLoop Int
before'

-- | Credits supplied using a relative value or an absolute value.
data SupplyMergeCredits = SupplyMergeCredits
                            !SupplyRelativeOrAbsolute
                            !MergeCredits
-- Note this is deliberately represented as a product type, not a sum type, to
-- get better unboxing in function args.

-- | Should we supply credits using a relative value or an absolute value.
data SupplyRelativeOrAbsolute = SupplyRelative | SupplyAbsolute

{-# SPECIALISE atomicDepositAndSpendCredits ::
     CreditsVar RealWorld
  -> MergeDebt
  -> CreditThreshold
  -> SupplyMergeCredits
  -> IO (MergeCredits, MergeCredits, MergeCredits, MergeCredits) #-}
-- | Atomically: add to the unspent credits pot, subject to the supplied
-- credits not exceeding the total debt. Return the new spent and unspent
-- credits, plus any leftover credits in excess of the total debt.
--
-- This is the only operation that changes the total supplied credits, and in
-- a non-decreasing way. Hence overall the supplied credits is monotonically
-- non-decreasing.
--
atomicDepositAndSpendCredits ::
     PrimMonad m
  => CreditsVar (PrimState m)
  -> MergeDebt -- ^ total debt
  -> CreditThreshold
  -> SupplyMergeCredits
  -> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
     -- ^ (suppliedBefore, suppliedAfter, spendCredits, leftoverCredits)
atomicDepositAndSpendCredits :: forall (m :: * -> *).
PrimMonad m =>
CreditsVar (PrimState m)
-> MergeDebt
-> CreditThreshold
-> SupplyMergeCredits
-> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
atomicDepositAndSpendCredits (CreditsVar !PrimVar (PrimState m) Int
var) (MergeDebt !MergeCredits
totalDebt)
                             (CreditThreshold !UnspentCredits
batchThreshold)
                             (SupplyMergeCredits !SupplyRelativeOrAbsolute
supplyRelOrAbs !MergeCredits
credits) =
    Bool
-> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
credits MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
0) (m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
 -> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
-> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
    PrimVar (PrimState m) Int
-> (Int
    -> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits)))
-> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
forall (m :: * -> *) a.
PrimMonad m =>
PrimVar (PrimState m) Int -> (Int -> (Int, a)) -> m a
atomicModifyInt PrimVar (PrimState m) Int
var ((Int
  -> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits)))
 -> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
-> (Int
    -> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits)))
-> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$ \(CreditsPair !SpentCredits
spent !UnspentCredits
unspent) ->
      let (MergeCredits
supplied, MergeCredits
supplied', UnspentCredits
unspent', MergeCredits
leftover)
            = SpentCredits
-> UnspentCredits
-> SupplyRelativeOrAbsolute
-> MergeCredits
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
depositCredits SpentCredits
spent UnspentCredits
unspent SupplyRelativeOrAbsolute
supplyRelOrAbs MergeCredits
credits

          (MergeCredits
spend, SpentCredits
spent'', UnspentCredits
unspent'')
            -- 1. supplied enough credits to complete the merge;
            | MergeCredits
supplied' MergeCredits -> MergeCredits -> Bool
forall a. Eq a => a -> a -> Bool
== MergeCredits
totalDebt
            = SpentCredits
-> UnspentCredits -> (MergeCredits, SpentCredits, UnspentCredits)
spendAllCredits   SpentCredits
spent UnspentCredits
unspent'

            -- 2. not case 1, but enough unspent credits have accumulated to do
            -- a batch of merge work;
            | UnspentCredits
unspent' UnspentCredits -> UnspentCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= UnspentCredits
batchThreshold
            = SpentCredits
-> UnspentCredits
-> UnspentCredits
-> (MergeCredits, SpentCredits, UnspentCredits)
spendBatchCredits SpentCredits
spent UnspentCredits
unspent' UnspentCredits
batchThreshold

            -- 3. not case 1 or 2, not enough credits to do any merge work.
            | Bool
otherwise
            = (MergeCredits
0, SpentCredits
spent, UnspentCredits
unspent')

       in MergeCredits
-> SpentCredits
-> UnspentCredits
-> MergeCredits
-> MergeCredits
-> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
txResultFor MergeCredits
supplied SpentCredits
spent'' UnspentCredits
unspent'' MergeCredits
spend MergeCredits
leftover
  where
    txResultFor :: MergeCredits
-> SpentCredits
-> UnspentCredits
-> MergeCredits
-> MergeCredits
-> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
txResultFor !MergeCredits
supplied (SpentCredits !MergeCredits
spent) (UnspentCredits !MergeCredits
unspent)
                !MergeCredits
spend !MergeCredits
leftover =
      let !after :: Int
after     = SpentCredits -> UnspentCredits -> Int
CreditsPair (MergeCredits -> SpentCredits
SpentCredits MergeCredits
spent) (MergeCredits -> UnspentCredits
UnspentCredits MergeCredits
unspent)
          !supplied' :: MergeCredits
supplied' = MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent
          !result :: (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
result    = (MergeCredits
supplied, MergeCredits
supplied', MergeCredits
spend, MergeCredits
leftover)

       in Bool
-> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
-> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
supplied  MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
supplied') ((Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
 -> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits)))
-> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
-> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
forall a b. (a -> b) -> a -> b
$
          Bool
-> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
-> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
supplied' MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
totalDebt) ((Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
 -> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits)))
-> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
-> (Int, (MergeCredits, MergeCredits, MergeCredits, MergeCredits))
forall a b. (a -> b) -> a -> b
$
          (Int
after, (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
result)

    depositCredits :: SpentCredits
-> UnspentCredits
-> SupplyRelativeOrAbsolute
-> MergeCredits
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
depositCredits (SpentCredits !MergeCredits
spent) (UnspentCredits !MergeCredits
unspent)
                   SupplyRelativeOrAbsolute
SupplyRelative !MergeCredits
deposit =
      let !supplied :: MergeCredits
supplied  = MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent
          !leftover :: MergeCredits
leftover  = MergeCredits -> MergeCredits -> MergeCredits
forall a. Ord a => a -> a -> a
max MergeCredits
0 (MergeCredits
supplied MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
deposit MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
- MergeCredits
totalDebt)
          !deposit' :: MergeCredits
deposit'  = MergeCredits
deposit MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
- MergeCredits
leftover
          !unspent' :: MergeCredits
unspent'  = MergeCredits
unspent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
deposit'
          !supplied' :: MergeCredits
supplied' = MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent'
       in Bool
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
unspent' MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
unspent) ((MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
 -> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits))
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
          Bool
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
deposit' MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
0) ((MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
 -> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits))
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
          Bool
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
leftover MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
0 Bool -> Bool -> Bool
&& MergeCredits
leftover MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
deposit) ((MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
 -> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits))
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
          (MergeCredits
supplied, MergeCredits
supplied', MergeCredits -> UnspentCredits
UnspentCredits MergeCredits
unspent', MergeCredits
leftover)

    depositCredits (SpentCredits !MergeCredits
spent) (UnspentCredits !MergeCredits
unspent)
                   SupplyRelativeOrAbsolute
SupplyAbsolute !MergeCredits
targetSupplied =
      let !supplied :: MergeCredits
supplied  = MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent
          !supplied' :: MergeCredits
supplied' = MergeCredits -> MergeCredits -> MergeCredits
forall a. Ord a => a -> a -> a
min MergeCredits
totalDebt (MergeCredits -> MergeCredits -> MergeCredits
forall a. Ord a => a -> a -> a
max MergeCredits
supplied MergeCredits
targetSupplied)
          !deposit :: MergeCredits
deposit   = MergeCredits
supplied' MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
- MergeCredits
supplied
          !leftover :: MergeCredits
leftover  = MergeCredits
0 -- meaningless concept for absolute case
          !unspent' :: MergeCredits
unspent'  = MergeCredits
unspent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
deposit
       in Bool
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
unspent'  MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
unspent) ((MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
 -> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits))
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
          Bool
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
supplied' MergeCredits -> MergeCredits -> Bool
forall a. Eq a => a -> a -> Bool
== MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent') ((MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
 -> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits))
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
          Bool
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
deposit   MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
0) ((MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
 -> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits))
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
-> (MergeCredits, MergeCredits, UnspentCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
          (MergeCredits
supplied, MergeCredits
supplied', MergeCredits -> UnspentCredits
UnspentCredits MergeCredits
unspent', MergeCredits
leftover)

    spendBatchCredits :: SpentCredits
-> UnspentCredits
-> UnspentCredits
-> (MergeCredits, SpentCredits, UnspentCredits)
spendBatchCredits (SpentCredits !MergeCredits
spent) (UnspentCredits !MergeCredits
unspent)
                      (UnspentCredits !MergeCredits
unspentBatchThreshold) =
      -- numBatches may be zero, in which case the result will be zero
      let !nBatches :: MergeCredits
nBatches = MergeCredits
unspent MergeCredits -> MergeCredits -> MergeCredits
forall a. Integral a => a -> a -> a
`div` MergeCredits
unspentBatchThreshold
          !spend :: MergeCredits
spend    = MergeCredits
nBatches MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
* MergeCredits
unspentBatchThreshold
          !spent' :: MergeCredits
spent'   = MergeCredits
spent   MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
spend
          !unspent' :: MergeCredits
unspent' = MergeCredits
unspent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
- MergeCredits
spend
       in Bool
-> (MergeCredits, SpentCredits, UnspentCredits)
-> (MergeCredits, SpentCredits, UnspentCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
spend MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
0) ((MergeCredits, SpentCredits, UnspentCredits)
 -> (MergeCredits, SpentCredits, UnspentCredits))
-> (MergeCredits, SpentCredits, UnspentCredits)
-> (MergeCredits, SpentCredits, UnspentCredits)
forall a b. (a -> b) -> a -> b
$
          Bool
-> (MergeCredits, SpentCredits, UnspentCredits)
-> (MergeCredits, SpentCredits, UnspentCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
unspent' MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
< MergeCredits
unspentBatchThreshold) ((MergeCredits, SpentCredits, UnspentCredits)
 -> (MergeCredits, SpentCredits, UnspentCredits))
-> (MergeCredits, SpentCredits, UnspentCredits)
-> (MergeCredits, SpentCredits, UnspentCredits)
forall a b. (a -> b) -> a -> b
$
          Bool
-> (MergeCredits, SpentCredits, UnspentCredits)
-> (MergeCredits, SpentCredits, UnspentCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
spent' MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent' MergeCredits -> MergeCredits -> Bool
forall a. Eq a => a -> a -> Bool
== MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent) ((MergeCredits, SpentCredits, UnspentCredits)
 -> (MergeCredits, SpentCredits, UnspentCredits))
-> (MergeCredits, SpentCredits, UnspentCredits)
-> (MergeCredits, SpentCredits, UnspentCredits)
forall a b. (a -> b) -> a -> b
$
          (MergeCredits
spend, MergeCredits -> SpentCredits
SpentCredits MergeCredits
spent', MergeCredits -> UnspentCredits
UnspentCredits MergeCredits
unspent')

    spendAllCredits :: SpentCredits
-> UnspentCredits -> (MergeCredits, SpentCredits, UnspentCredits)
spendAllCredits (SpentCredits !MergeCredits
spent) (UnspentCredits !MergeCredits
unspent) =
      let spend :: MergeCredits
spend    = MergeCredits
unspent
          spent' :: MergeCredits
spent'   = MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
spend
          unspent' :: MergeCredits
unspent' = MergeCredits
0
       in Bool
-> (MergeCredits, SpentCredits, UnspentCredits)
-> (MergeCredits, SpentCredits, UnspentCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
spent' MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent' MergeCredits -> MergeCredits -> Bool
forall a. Eq a => a -> a -> Bool
== MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent) ((MergeCredits, SpentCredits, UnspentCredits)
 -> (MergeCredits, SpentCredits, UnspentCredits))
-> (MergeCredits, SpentCredits, UnspentCredits)
-> (MergeCredits, SpentCredits, UnspentCredits)
forall a b. (a -> b) -> a -> b
$
          (MergeCredits
spend, MergeCredits -> SpentCredits
SpentCredits MergeCredits
spent', MergeCredits -> UnspentCredits
UnspentCredits MergeCredits
unspent')


{-# SPECIALISE atomicSpendCredits ::
     CreditsVar RealWorld
  -> MergeCredits
  -> IO () #-}
-- | Atomically: transfer the given number of credits from the unspent pot to
-- the spent pot. The new unspent credits balance may be negative.
--
-- The amount to spend can also be negative to reverse a spending transaction.
--
-- The total supplied credits is unchanged.
--
atomicSpendCredits ::
     PrimMonad m
  => CreditsVar (PrimState m)
  -> MergeCredits -- ^ Can be positive to spend, or negative to unspend.
  -> m ()
atomicSpendCredits :: forall (m :: * -> *).
PrimMonad m =>
CreditsVar (PrimState m) -> MergeCredits -> m ()
atomicSpendCredits (CreditsVar PrimVar (PrimState m) Int
var) MergeCredits
spend =
    PrimVar (PrimState m) Int -> (Int -> (Int, ())) -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
PrimVar (PrimState m) Int -> (Int -> (Int, a)) -> m a
atomicModifyInt PrimVar (PrimState m) Int
var ((Int -> (Int, ())) -> m ()) -> (Int -> (Int, ())) -> m ()
forall a b. (a -> b) -> a -> b
$ \(CreditsPair (SpentCredits   !MergeCredits
spent)
                                        (UnspentCredits !MergeCredits
unspent)) ->
      let spent' :: MergeCredits
spent'   = MergeCredits
spent   MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
spend
          unspent' :: MergeCredits
unspent' = MergeCredits
unspent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
- MergeCredits
spend
          after :: Int
after    = SpentCredits -> UnspentCredits -> Int
CreditsPair (MergeCredits -> SpentCredits
SpentCredits   MergeCredits
spent')
                                 (MergeCredits -> UnspentCredits
UnspentCredits MergeCredits
unspent')
       in Bool -> (Int, ()) -> (Int, ())
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
spent' MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent' MergeCredits -> MergeCredits -> Bool
forall a. Eq a => a -> a -> Bool
== MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent) ((Int, ()) -> (Int, ())) -> (Int, ()) -> (Int, ())
forall a b. (a -> b) -> a -> b
$
          (Int
after, ())

{-------------------------------------------------------------------------------
  The main algorithms
-------------------------------------------------------------------------------}

{-# SPECIALISE remainingMergeDebt ::
     Ref (MergingRun t IO h) -> IO (MergeDebt, NumEntries) #-}
-- | Calculate the merge credits required to complete the merge, as well as an
-- upper bound on the size of the resulting run.
remainingMergeDebt ::
     (MonadMVar m, PrimMonad m)
  => Ref (MergingRun t m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt :: forall (m :: * -> *) t h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingRun t m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt (DeRef MergingRun t m h
mr) = do
    StrictMVar m (MergingRunState t m h) -> m (MergingRunState t m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar (MergingRun t m h -> StrictMVar m (MergingRunState t m h)
forall t (m :: * -> *) h.
MergingRun t m h -> StrictMVar m (MergingRunState t m h)
mergeState MergingRun t m h
mr) m (MergingRunState t m h)
-> (MergingRunState t m h -> m (MergeDebt, NumEntries))
-> m (MergeDebt, NumEntries)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      CompletedMerge Ref (Run m h)
r -> do
        (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergeCredits -> MergeDebt
MergeDebt MergeCredits
0, Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r)
      OngoingMerge Vector (Ref (Run m h))
_ Merge t m h
_ -> do
        let MergeDebt MergeCredits
totalDebt = MergingRun t m h -> MergeDebt
forall t (m :: * -> *) h. MergingRun t m h -> MergeDebt
mergeDebt MergingRun t m h
mr
        let size :: NumEntries
size = let MergeCredits Int
n = MergeCredits
totalDebt in Int -> NumEntries
NumEntries Int
n
        (SpentCredits MergeCredits
spent, UnspentCredits MergeCredits
unspent) <-
          CreditsVar (PrimState m) -> m (SpentCredits, UnspentCredits)
forall (m :: * -> *).
PrimMonad m =>
CreditsVar (PrimState m) -> m (SpentCredits, UnspentCredits)
atomicReadCredits (MergingRun t m h -> CreditsVar (PrimState m)
forall t (m :: * -> *) h.
MergingRun t m h -> CreditsVar (PrimState m)
mergeCreditsVar MergingRun t m h
mr)
        let debt :: MergeCredits
debt = MergeCredits
totalDebt MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
- (MergeCredits
spent MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspent)
        Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
debt MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergeCredits -> MergeDebt
MergeDebt MergeCredits
debt, NumEntries
size)

{-# INLINE supplyChecked #-}
-- | Helper function to assert common invariants for functions that supply
-- credits.
supplyChecked ::
     forall m r s. (HasCallStack, Monad m)
  => (r -> m (MergeDebt, s))  -- how to query current debt
  -> (r -> MergeCredits -> m MergeCredits)  -- how to supply
  -> (r -> MergeCredits -> m MergeCredits)
supplyChecked :: forall (m :: * -> *) r s.
(?callStack::CallStack, Monad m) =>
(r -> m (MergeDebt, s))
-> (r -> MergeCredits -> m MergeCredits)
-> r
-> MergeCredits
-> m MergeCredits
supplyChecked r -> m (MergeDebt, s)
_query r -> MergeCredits -> m MergeCredits
supply r
x MergeCredits
credits = do
    (?callStack::CallStack) => Bool -> m ()
Bool -> m ()
assertM (Bool -> m ()) -> Bool -> m ()
forall a b. (a -> b) -> a -> b
$ MergeCredits
credits MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
> MergeCredits
0   -- only call them when there are credits to supply
#ifdef NO_IGNORE_ASSERTS
    debt <- fst <$> _query x
    assertM $ debt >= MergeDebt 0 -- debt can't be negative
    leftovers <- supply x credits
    assertM $ leftovers <= credits -- can't have more left than we started with
    assertM $ leftovers >= 0       -- leftovers can't be negative
    debt' <- fst <$> _query x
    assertM $ debt' >= MergeDebt 0
    -- the debt was reduced sufficiently (amount of credits spent)
    assertM $ debt' <= let MergeDebt d = debt
                       in MergeDebt (d - (credits - leftovers))
    return leftovers
#else
    r -> MergeCredits -> m MergeCredits
supply r
x MergeCredits
credits
#endif
  where
    assertM :: HasCallStack => Bool -> m ()
    assertM :: (?callStack::CallStack) => Bool -> m ()
assertM Bool
p = let CallStack
_ = CallStack
(?callStack::CallStack) => CallStack
callStack in Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert Bool
p (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    -- just uses callStack so the constraint is not redundant in release builds

{-# INLINE supplyCreditsRelative #-}
-- | Supply the given amount of credits to a merging run. This /may/ cause an
-- ongoing merge to progress.
--
-- The credits are given in relative terms: as an addition to the current
-- supplied credits. See 'supplyCreditsAbsolute' to set the supplied credits
-- to an absolute value.
--
-- The result is the number of credits left over. This will be non-zero if the
-- credits supplied would take the total supplied credits over the total merge
-- debt.
--
supplyCreditsRelative ::
     forall t m h. (MonadSTM m, MonadST m, MonadMVar m, MonadMask m)
  => Ref (MergingRun t m h)
  -> CreditThreshold
  -> MergeCredits
  -> m MergeCredits
supplyCreditsRelative :: forall t (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h)
-> CreditThreshold -> MergeCredits -> m MergeCredits
supplyCreditsRelative = (CreditThreshold
 -> Ref (MergingRun t m h) -> MergeCredits -> m MergeCredits)
-> Ref (MergingRun t m h)
-> CreditThreshold
-> MergeCredits
-> m MergeCredits
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((CreditThreshold
  -> Ref (MergingRun t m h) -> MergeCredits -> m MergeCredits)
 -> Ref (MergingRun t m h)
 -> CreditThreshold
 -> MergeCredits
 -> m MergeCredits)
-> (CreditThreshold
    -> Ref (MergingRun t m h) -> MergeCredits -> m MergeCredits)
-> Ref (MergingRun t m h)
-> CreditThreshold
-> MergeCredits
-> m MergeCredits
forall a b. (a -> b) -> a -> b
$ \CreditThreshold
th ->
    (Ref (MergingRun t m h) -> m (MergeDebt, NumEntries))
-> (Ref (MergingRun t m h) -> MergeCredits -> m MergeCredits)
-> Ref (MergingRun t m h)
-> MergeCredits
-> m MergeCredits
forall (m :: * -> *) r s.
(?callStack::CallStack, Monad m) =>
(r -> m (MergeDebt, s))
-> (r -> MergeCredits -> m MergeCredits)
-> r
-> MergeCredits
-> m MergeCredits
supplyChecked Ref (MergingRun t m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) t h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingRun t m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt ((Ref (MergingRun t m h) -> MergeCredits -> m MergeCredits)
 -> Ref (MergingRun t m h) -> MergeCredits -> m MergeCredits)
-> (Ref (MergingRun t m h) -> MergeCredits -> m MergeCredits)
-> Ref (MergingRun t m h)
-> MergeCredits
-> m MergeCredits
forall a b. (a -> b) -> a -> b
$ \Ref (MergingRun t m h)
mr MergeCredits
c -> do
      (MergeCredits
_suppliedCredits, MergeCredits
suppliedCredits', MergeCredits
leftoverCredits)
        <- Ref (MergingRun t m h)
-> CreditThreshold
-> SupplyMergeCredits
-> m (MergeCredits, MergeCredits, MergeCredits)
forall t (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h)
-> CreditThreshold
-> SupplyMergeCredits
-> m (MergeCredits, MergeCredits, MergeCredits)
supplyCredits Ref (MergingRun t m h)
mr CreditThreshold
th (SupplyRelativeOrAbsolute -> MergeCredits -> SupplyMergeCredits
SupplyMergeCredits SupplyRelativeOrAbsolute
SupplyRelative MergeCredits
c)

      Bool -> m MergeCredits -> m MergeCredits
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
suppliedCredits' MergeCredits -> MergeCredits -> Bool
forall a. Eq a => a -> a -> Bool
== MergeDebt -> MergeCredits
mergeDebtAsCredits (Ref (MergingRun t m h) -> MergeDebt
forall t (m :: * -> *) h. Ref (MergingRun t m h) -> MergeDebt
totalMergeDebt Ref (MergingRun t m h)
mr)
              Bool -> Bool -> Bool
|| MergeCredits
leftoverCredits MergeCredits -> MergeCredits -> Bool
forall a. Eq a => a -> a -> Bool
== MergeCredits
0) (m MergeCredits -> m MergeCredits)
-> m MergeCredits -> m MergeCredits
forall a b. (a -> b) -> a -> b
$
        MergeCredits -> m MergeCredits
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MergeCredits
leftoverCredits

{-# INLINE supplyCreditsAbsolute #-}
-- | Set the supplied credits to the given value, unless the current value is
-- already greater. This /may/ cause an ongoing merge to progress.
--
-- The credits are given in absolute terms: as the new value for the current
-- supplied credits. See 'supplyCreditsRelative' to set the supplied credits
-- as a relative addition to the current value.
--
-- The given credit value must be no greater than the 'totalMergeDebt'.
--
-- The result is the new value of the total supplied credits, which may be more
-- than the specified value if the current value was already greater than the
-- specified value.
--
-- The result is:
--
--  1. The (absolute value of the) supplied credits beforehand.
--  2. The (absolute value of the) supplied credits afterwards. This will be
--     equal to the given value or to the supplied credits beforehand,
--     whichever is the greater.
--
supplyCreditsAbsolute ::
     forall t m h. (MonadSTM m, MonadST m, MonadMVar m, MonadMask m)
  => Ref (MergingRun t m h)
  -> CreditThreshold
  -> MergeCredits
  -> m (MergeCredits, MergeCredits)
       -- ^ (suppliedCredits, suppliedCredits')
supplyCreditsAbsolute :: forall t (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h)
-> CreditThreshold
-> MergeCredits
-> m (MergeCredits, MergeCredits)
supplyCreditsAbsolute Ref (MergingRun t m h)
mr CreditThreshold
th MergeCredits
c =
    Bool
-> m (MergeCredits, MergeCredits) -> m (MergeCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
0 MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
c Bool -> Bool -> Bool
&& MergeCredits
c MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeDebt -> MergeCredits
mergeDebtAsCredits (Ref (MergingRun t m h) -> MergeDebt
forall t (m :: * -> *) h. Ref (MergingRun t m h) -> MergeDebt
totalMergeDebt Ref (MergingRun t m h)
mr)) (m (MergeCredits, MergeCredits) -> m (MergeCredits, MergeCredits))
-> m (MergeCredits, MergeCredits) -> m (MergeCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$ do
    (MergeCredits
suppliedCredits, MergeCredits
suppliedCredits', MergeCredits
_leftoverCredits)
      <- Ref (MergingRun t m h)
-> CreditThreshold
-> SupplyMergeCredits
-> m (MergeCredits, MergeCredits, MergeCredits)
forall t (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h)
-> CreditThreshold
-> SupplyMergeCredits
-> m (MergeCredits, MergeCredits, MergeCredits)
supplyCredits Ref (MergingRun t m h)
mr CreditThreshold
th (SupplyRelativeOrAbsolute -> MergeCredits -> SupplyMergeCredits
SupplyMergeCredits SupplyRelativeOrAbsolute
SupplyAbsolute MergeCredits
c)
    Bool
-> m (MergeCredits, MergeCredits) -> m (MergeCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
suppliedCredits' MergeCredits -> MergeCredits -> Bool
forall a. Eq a => a -> a -> Bool
== MergeCredits -> MergeCredits -> MergeCredits
forall a. Ord a => a -> a -> a
max MergeCredits
c MergeCredits
suppliedCredits) (m (MergeCredits, MergeCredits) -> m (MergeCredits, MergeCredits))
-> m (MergeCredits, MergeCredits) -> m (MergeCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
      (MergeCredits, MergeCredits) -> m (MergeCredits, MergeCredits)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MergeCredits
suppliedCredits, MergeCredits
suppliedCredits')

{-# SPECIALISE supplyCredits ::
     Ref (MergingRun t IO h)
  -> CreditThreshold
  -> SupplyMergeCredits
  -> IO (MergeCredits, MergeCredits, MergeCredits) #-}
-- | Supply the given amount of credits to a merging run. This /may/ cause an
-- ongoing merge to progress.
supplyCredits ::
     forall t m h. (MonadSTM m, MonadST m, MonadMVar m, MonadMask m)
  => Ref (MergingRun t m h)
  -> CreditThreshold
  -> SupplyMergeCredits
  -> m (MergeCredits, MergeCredits, MergeCredits)
       -- ^ (suppliedCredits, suppliedCredits', leftoverCredits)
supplyCredits :: forall t (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h)
-> CreditThreshold
-> SupplyMergeCredits
-> m (MergeCredits, MergeCredits, MergeCredits)
supplyCredits (DeRef MergingRun {
                 MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted :: forall t (m :: * -> *) h.
MergingRun t m h -> MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted :: MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted,
                 MergeDebt
mergeDebt :: forall t (m :: * -> *) h. MergingRun t m h -> MergeDebt
mergeDebt :: MergeDebt
mergeDebt,
                 CreditsVar (PrimState m)
mergeCreditsVar :: forall t (m :: * -> *) h.
MergingRun t m h -> CreditsVar (PrimState m)
mergeCreditsVar :: CreditsVar (PrimState m)
mergeCreditsVar,
                 StrictMVar m (MergingRunState t m h)
mergeState :: forall t (m :: * -> *) h.
MergingRun t m h -> StrictMVar m (MergingRunState t m h)
mergeState :: StrictMVar m (MergingRunState t m h)
mergeState
               })
              !CreditThreshold
creditBatchThreshold
              (SupplyMergeCredits !SupplyRelativeOrAbsolute
supplyRelOrAbs !MergeCredits
credits) =
    Bool
-> m (MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
credits MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
0) (m (MergeCredits, MergeCredits, MergeCredits)
 -> m (MergeCredits, MergeCredits, MergeCredits))
-> m (MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$ do
    MergeKnownCompleted
mergeCompleted <- MutVar (PrimState m) MergeKnownCompleted -> m MergeKnownCompleted
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted
    case MergeKnownCompleted
mergeCompleted of
      MergeKnownCompleted
MergeKnownCompleted ->
        let suppliedCredits :: MergeCredits
suppliedCredits  = MergeDebt -> MergeCredits
mergeDebtAsCredits MergeDebt
mergeDebt -- we're completed!
            suppliedCredits' :: MergeCredits
suppliedCredits' = MergeCredits
suppliedCredits -- we can't supply more now
            leftoverCredits :: MergeCredits
leftoverCredits  = MergeCredits
credits -- but meaningless for SupplyAbsolute
         in (MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MergeCredits
suppliedCredits, MergeCredits
suppliedCredits', MergeCredits
leftoverCredits)
      MergeKnownCompleted
MergeMaybeCompleted ->
        m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
-> ((MergeCredits, MergeCredits, MergeCredits, MergeCredits)
    -> m ())
-> ((MergeCredits, MergeCredits, MergeCredits, MergeCredits)
    -> m (MergeCredits, MergeCredits, MergeCredits))
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadCatch m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError
          -- Atomically add credits to the unspent credits (but not allowing
          -- supplied credits to exceed the total debt), determine which case
          -- we're in and thus how many credits we should try to spend now on
          -- performing merge steps. Return the credits to spend now and any
          -- leftover credits that would exceed the debt limit.
          (CreditsVar (PrimState m)
-> MergeDebt
-> CreditThreshold
-> SupplyMergeCredits
-> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
forall (m :: * -> *).
PrimMonad m =>
CreditsVar (PrimState m)
-> MergeDebt
-> CreditThreshold
-> SupplyMergeCredits
-> m (MergeCredits, MergeCredits, MergeCredits, MergeCredits)
atomicDepositAndSpendCredits
            CreditsVar (PrimState m)
mergeCreditsVar MergeDebt
mergeDebt
            CreditThreshold
creditBatchThreshold
            (SupplyRelativeOrAbsolute -> MergeCredits -> SupplyMergeCredits
SupplyMergeCredits SupplyRelativeOrAbsolute
supplyRelOrAbs MergeCredits
credits))

          -- If an exception occurs while merging (sync or async) then we
          -- reverse the spending of the credits (but not the deposit).
          (\(MergeCredits
_, MergeCredits
_, MergeCredits
spendCredits, MergeCredits
_) ->
            CreditsVar (PrimState m) -> MergeCredits -> m ()
forall (m :: * -> *).
PrimMonad m =>
CreditsVar (PrimState m) -> MergeCredits -> m ()
atomicSpendCredits CreditsVar (PrimState m)
mergeCreditsVar (-MergeCredits
spendCredits))

          (\(MergeCredits
suppliedCredits, MergeCredits
suppliedCredits',
             MergeCredits
spendCredits, MergeCredits
leftoverCredits) -> do
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (MergeCredits
spendCredits MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
> MergeCredits
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
              Bool
weFinishedMerge <-
                StrictMVar m (MergingRunState t m h)
-> CreditsVar (PrimState m) -> MergeCredits -> m Bool
forall (m :: * -> *) t h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
StrictMVar m (MergingRunState t m h)
-> CreditsVar (PrimState m) -> MergeCredits -> m Bool
performMergeSteps StrictMVar m (MergingRunState t m h)
mergeState CreditsVar (PrimState m)
mergeCreditsVar MergeCredits
spendCredits

              -- If an async exception happens before we get to perform the
              -- completion, then that is fine. The next supplyCredits will
              -- complete the merge.
              Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
weFinishedMerge (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                StrictMVar m (MergingRunState t m h)
-> MutVar (PrimState m) MergeKnownCompleted -> m ()
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
StrictMVar m (MergingRunState t m h)
-> MutVar (PrimState m) MergeKnownCompleted -> m ()
completeMerge StrictMVar m (MergingRunState t m h)
mergeState MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted

            Bool
-> m (MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert   (               MergeCredits
0 MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
suppliedCredits) (m (MergeCredits, MergeCredits, MergeCredits)
 -> m (MergeCredits, MergeCredits, MergeCredits))
-> m (MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
              Bool
-> m (MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
suppliedCredits  MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
suppliedCredits') (m (MergeCredits, MergeCredits, MergeCredits)
 -> m (MergeCredits, MergeCredits, MergeCredits))
-> m (MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
              Bool
-> m (MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
suppliedCredits' MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeDebt -> MergeCredits
mergeDebtAsCredits MergeDebt
mergeDebt) (m (MergeCredits, MergeCredits, MergeCredits)
 -> m (MergeCredits, MergeCredits, MergeCredits))
-> m (MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a b. (a -> b) -> a -> b
$
              (MergeCredits, MergeCredits, MergeCredits)
-> m (MergeCredits, MergeCredits, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergeCredits
suppliedCredits, MergeCredits
suppliedCredits', MergeCredits
leftoverCredits))

{-# SPECIALISE performMergeSteps ::
     StrictMVar IO (MergingRunState t IO h)
  -> CreditsVar RealWorld
  -> MergeCredits
  -> IO Bool #-}
performMergeSteps ::
     (MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
  => StrictMVar m (MergingRunState t m h)
  -> CreditsVar (PrimState m)
  -> MergeCredits
  -> m Bool
performMergeSteps :: forall (m :: * -> *) t h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
StrictMVar m (MergingRunState t m h)
-> CreditsVar (PrimState m) -> MergeCredits -> m Bool
performMergeSteps StrictMVar m (MergingRunState t m h)
mergeVar CreditsVar (PrimState m)
creditsVar MergeCredits
credits =
    Bool -> m Bool -> m Bool
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
credits MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
0) (m Bool -> m Bool) -> m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$
    StrictMVar m (MergingRunState t m h)
-> (MergingRunState t m h -> m Bool) -> m Bool
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m b) -> m b
withMVar StrictMVar m (MergingRunState t m h)
mergeVar ((MergingRunState t m h -> m Bool) -> m Bool)
-> (MergingRunState t m h -> m Bool) -> m Bool
forall a b. (a -> b) -> a -> b
$ \case
      CompletedMerge{}   -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
      OngoingMerge Vector (Ref (Run m h))
_rs Merge t m h
m -> do
        let MergeCredits Int
stepsToDo = MergeCredits
credits
        (Int
stepsDone, StepResult
stepResult) <- Merge t m h -> Int -> m (Int, StepResult)
forall (m :: * -> *) t h.
(MonadMask m, MonadSTM m, MonadST m) =>
Merge t m h -> Int -> m (Int, StepResult)
Merge.steps Merge t m h
m Int
stepsToDo
        Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (StepResult
stepResult StepResult -> StepResult -> Bool
forall a. Eq a => a -> a -> Bool
== StepResult
MergeDone Bool -> Bool -> Bool
|| Int
stepsDone Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
stepsToDo) (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
        -- Merge.steps guarantees that @stepsDone >= stepsToDo@ /unless/ the
        -- merge was just now finished and excess credit was supplied.
        -- The latter is possible. As noted elsewhere, exceptions can result in
        -- us having done more merge steps than we accounted for with spent
        -- credits, hence it is possible when getting to the end of the merge
        -- for us to try to do more steps than there are steps possible to do.

        -- If excess merging steps were done then we must account for that.
        -- We do so by borrowing the excess from the unspent credits pot and
        -- spending them, i.e. doing a transfer from unspent to spent. This
        -- may result in the unspent credits pot becoming negative.
        let stepsExcess :: MergeCredits
stepsExcess = Int -> MergeCredits
MergeCredits (Int
stepsDone Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
stepsToDo)
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (MergeCredits
stepsExcess MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
> MergeCredits
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          CreditsVar (PrimState m) -> MergeCredits -> m ()
forall (m :: * -> *).
PrimMonad m =>
CreditsVar (PrimState m) -> MergeCredits -> m ()
atomicSpendCredits CreditsVar (PrimState m)
creditsVar MergeCredits
stepsExcess

        Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ StepResult
stepResult StepResult -> StepResult -> Bool
forall a. Eq a => a -> a -> Bool
== StepResult
MergeDone

{-# SPECIALISE completeMerge ::
     StrictMVar IO (MergingRunState t IO h)
  -> MutVar RealWorld MergeKnownCompleted
  -> IO () #-}
-- | Convert an 'OngoingMerge' to a 'CompletedMerge'.
completeMerge ::
     (MonadSTM m, MonadST m, MonadMVar m, MonadMask m)
  => StrictMVar m (MergingRunState t m h)
  -> MutVar (PrimState m) MergeKnownCompleted
  -> m ()
completeMerge :: forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
StrictMVar m (MergingRunState t m h)
-> MutVar (PrimState m) MergeKnownCompleted -> m ()
completeMerge StrictMVar m (MergingRunState t m h)
mergeVar MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompletedVar = do
    StrictMVar m (MergingRunState t m h)
-> (MergingRunState t m h -> m (MergingRunState t m h)) -> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVarMasked_ StrictMVar m (MergingRunState t m h)
mergeVar ((MergingRunState t m h -> m (MergingRunState t m h)) -> m ())
-> (MergingRunState t m h -> m (MergingRunState t m h)) -> m ()
forall a b. (a -> b) -> a -> b
$ \case
      mrs :: MergingRunState t m h
mrs@CompletedMerge{} -> MergingRunState t m h -> m (MergingRunState t m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MergingRunState t m h -> m (MergingRunState t m h))
-> MergingRunState t m h -> m (MergingRunState t m h)
forall a b. (a -> b) -> a -> b
$! MergingRunState t m h
mrs
      (OngoingMerge Vector (Ref (Run m h))
rs Merge t m h
m) -> do
        -- first try to complete the merge before performing other side effects,
        -- in case the completion fails
        --TODO: Run.fromBuilder (used in Merge.complete) claims not to be
        -- exception safe so we should probably be using the resource registry
        -- and test for exception safety.
        Ref (Run m h)
r <- Merge t m h -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadMask m) =>
Merge t m h -> m (Ref (Run m h))
Merge.complete Merge t m h
m
        Vector (Ref (Run m h)) -> (Ref (Run m h) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (Ref (Run m h))
rs Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m,
 ?callStack::CallStack) =>
Ref obj -> m ()
releaseRef
        -- Cache the knowledge that we completed the merge
        MutVar (PrimState m) MergeKnownCompleted
-> MergeKnownCompleted -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompletedVar MergeKnownCompleted
MergeKnownCompleted
        MergingRunState t m h -> m (MergingRunState t m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MergingRunState t m h -> m (MergingRunState t m h))
-> MergingRunState t m h -> m (MergingRunState t m h)
forall a b. (a -> b) -> a -> b
$! Ref (Run m h) -> MergingRunState t m h
forall t (m :: * -> *) h. Ref (Run m h) -> MergingRunState t m h
CompletedMerge Ref (Run m h)
r

{-# SPECIALISE expectCompleted ::
     Ref (MergingRun t IO h)
  -> IO (Ref (Run IO h)) #-}
-- | This does /not/ release the reference, but allocates a new reference for
-- the returned run, which must be released at some point.
expectCompleted ::
     (MonadMVar m, MonadSTM m, MonadST m, MonadMask m)
  => Ref (MergingRun t m h) -> m (Ref (Run m h))
expectCompleted :: forall (m :: * -> *) t h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Ref (Run m h))
expectCompleted (DeRef MergingRun {RefCounter m
MutVar (PrimState m) MergeKnownCompleted
StrictMVar m (MergingRunState t m h)
CreditsVar (PrimState m)
MergeDebt
mergeState :: forall t (m :: * -> *) h.
MergingRun t m h -> StrictMVar m (MergingRunState t m h)
mergeDebt :: forall t (m :: * -> *) h. MergingRun t m h -> MergeDebt
mergeCreditsVar :: forall t (m :: * -> *) h.
MergingRun t m h -> CreditsVar (PrimState m)
mergeKnownCompleted :: forall t (m :: * -> *) h.
MergingRun t m h -> MutVar (PrimState m) MergeKnownCompleted
mergeRefCounter :: forall t (m :: * -> *) h. MergingRun t m h -> RefCounter m
mergeDebt :: MergeDebt
mergeCreditsVar :: CreditsVar (PrimState m)
mergeKnownCompleted :: MutVar (PrimState m) MergeKnownCompleted
mergeState :: StrictMVar m (MergingRunState t m h)
mergeRefCounter :: RefCounter m
..}) = do
    MergeKnownCompleted
knownCompleted <- MutVar (PrimState m) MergeKnownCompleted -> m MergeKnownCompleted
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted
    -- The merge is not guaranteed to be complete, so we do the remaining steps
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (MergeKnownCompleted
knownCompleted MergeKnownCompleted -> MergeKnownCompleted -> Bool
forall a. Eq a => a -> a -> Bool
== MergeKnownCompleted
MergeMaybeCompleted) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      (SpentCredits   MergeCredits
spentCredits,
       UnspentCredits MergeCredits
unspentCredits) <- CreditsVar (PrimState m) -> m (SpentCredits, UnspentCredits)
forall (m :: * -> *).
PrimMonad m =>
CreditsVar (PrimState m) -> m (SpentCredits, UnspentCredits)
atomicReadCredits CreditsVar (PrimState m)
mergeCreditsVar
      let suppliedCredits :: MergeCredits
suppliedCredits = MergeCredits
spentCredits MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
unspentCredits
          !credits :: MergeCredits
credits        = Bool -> MergeCredits -> MergeCredits
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits -> MergeDebt
MergeDebt MergeCredits
suppliedCredits MergeDebt -> MergeDebt -> Bool
forall a. Eq a => a -> a -> Bool
== MergeDebt
mergeDebt) (MergeCredits -> MergeCredits) -> MergeCredits -> MergeCredits
forall a b. (a -> b) -> a -> b
$
                            Bool -> MergeCredits -> MergeCredits
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (MergeCredits
unspentCredits MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
>= MergeCredits
0) (MergeCredits -> MergeCredits) -> MergeCredits -> MergeCredits
forall a b. (a -> b) -> a -> b
$
                            MergeCredits
unspentCredits

      Bool
weFinishedMerge <- StrictMVar m (MergingRunState t m h)
-> CreditsVar (PrimState m) -> MergeCredits -> m Bool
forall (m :: * -> *) t h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
StrictMVar m (MergingRunState t m h)
-> CreditsVar (PrimState m) -> MergeCredits -> m Bool
performMergeSteps StrictMVar m (MergingRunState t m h)
mergeState CreditsVar (PrimState m)
mergeCreditsVar MergeCredits
credits
      -- If an async exception happens before we get to perform the
      -- completion, then that is fine. The next 'expectCompleted' will
      -- complete the merge.
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
weFinishedMerge (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictMVar m (MergingRunState t m h)
-> MutVar (PrimState m) MergeKnownCompleted -> m ()
forall (m :: * -> *) t h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
StrictMVar m (MergingRunState t m h)
-> MutVar (PrimState m) MergeKnownCompleted -> m ()
completeMerge StrictMVar m (MergingRunState t m h)
mergeState MutVar (PrimState m) MergeKnownCompleted
mergeKnownCompleted
    StrictMVar m (MergingRunState t m h)
-> (MergingRunState t m h -> m (Ref (Run m h)))
-> m (Ref (Run m h))
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m b) -> m b
withMVar StrictMVar m (MergingRunState t m h)
mergeState ((MergingRunState t m h -> m (Ref (Run m h))) -> m (Ref (Run m h)))
-> (MergingRunState t m h -> m (Ref (Run m h)))
-> m (Ref (Run m h))
forall a b. (a -> b) -> a -> b
$ \case
      CompletedMerge Ref (Run m h)
r -> Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m,
 ?callStack::CallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r  -- return a fresh reference to the run
      OngoingMerge{} -> do
        -- If the algorithm finds an ongoing merge here, then it is a bug in
        -- our merge sceduling algorithm. As such, we throw a pure error.
        [Char] -> m (Ref (Run m h))
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"expectCompleted: expected a completed merge, but found an ongoing merge"