{-# LANGUAGE CPP             #-}
{-# LANGUAGE PatternSynonyms #-}
{-# OPTIONS_HADDOCK not-home #-}

-- | An incremental merge of multiple runs, preserving a bracketing structure.
--
module Database.LSMTree.Internal.MergingTree (
    -- $mergingtrees
    MergingTree (..)
  , PreExistingRun (..)
  , newCompletedMerge
  , newOngoingMerge
  , newPendingLevelMerge
  , newPendingUnionMerge
  , isStructurallyEmpty
  , remainingMergeDebt
  , supplyCredits
    -- * Internal state
  , MergingTreeState (..)
  , PendingMerge (..)
  ) where

import           Control.ActionRegistry
import           Control.Concurrent.Class.MonadMVar.Strict
import           Control.Exception (assert)
import           Control.Monad (foldM, (<$!>))
import           Control.Monad.Class.MonadST (MonadST)
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow (MonadMask)
import           Control.Monad.Primitive
import           Control.RefCount
import           Data.Foldable (toList, traverse_)
#if !MIN_VERSION_base(4,20,0)
import           Data.List (foldl')
                 -- foldl' is included in the Prelude from base 4.20 onwards
#endif
import           Data.Vector (Vector)
import qualified Data.Vector as V
import           Database.LSMTree.Internal.Entry (NumEntries (..))
import           Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import           Database.LSMTree.Internal.MergingRun (MergeDebt (..),
                     MergingRun)
import qualified Database.LSMTree.Internal.MergingRun as MR
import           Database.LSMTree.Internal.Paths (SessionRoot)
import qualified Database.LSMTree.Internal.Paths as Paths
import           Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import           Database.LSMTree.Internal.UniqCounter
import           System.FS.API (HasFS)
import           System.FS.BlockIO.API (HasBlockIO)

-- $mergingtrees Semantically, tables are key-value stores like Haskell's
-- @Map@. Table unions then behave like @Map.unionWith (<>)@. If one of the
-- input tables contains a value at a particular key, the result will also
-- contain it. If multiple tables share that key, the values will be combined
-- monoidally.
--
-- Looking at the implementation, tables are not just key-value pairs, but
-- consist of runs. If each table was just a single run, unioning would involve
-- a run merge similar to the one used for compaction (when a level is full),
-- but with a different merge type 'MR.MergeUnion' that differs semantically:
-- Here, runs don't represent updates (overwriting each other), but they each
-- represent the full state of a table. There is no distinction between no
-- entry and a 'Delete', between an 'Insert' and a 'Mupsert'.
--
-- To union two tables, we can therefore first merge down each table into a
-- single run (using regular level merges) and then union merge these.
--
-- However, we want to spread out the work required and perform these merges
-- incrementally. At first, we only create a new table that is empty except for
-- a data structure 'MergingTree', representing the merges that need to be
-- done. The usual operations can then be performed on the table while the
-- merge is in progress: Inserts go into the table as usual, not affecting its
-- last level ('UnionLevel'), lookups need to consider the tree (requiring some
-- complexity and runtime overhead), further unions incorporate the in-progress
-- tree into the resulting one, which also shares future merging work.
--
-- It seems necessary to represent the suspended merges using a tree. Other
-- approaches don't allow for full sharing of the incremental work (e.g.
-- because they effectively \"re-bracket\" nested unions). It also seems
-- necessary to first merge each input table into a single run, as there is no
-- practical distributive property between level and union merges.


-- | A \"merging tree\" is a mutable representation of an incremental
-- tree-shaped nested merge. This allows to represent union merges of entire
-- tables, each of which itself first need to be merged to become a single run.
--
-- Trees have to support arbitrarily deep nesting, since each input to 'union'
-- might already contain an in-progress merging tree (which then becomes shared
-- between multiple tables).
--
data MergingTree m h = MergingTree {
      forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeState      :: !(StrictMVar m (MergingTreeState m h))
    , forall (m :: * -> *) h. MergingTree m h -> RefCounter m
mergeRefCounter :: !(RefCounter m)
    }

instance RefCounted m (MergingTree m h) where
    getRefCounter :: MergingTree m h -> RefCounter m
getRefCounter = MergingTree m h -> RefCounter m
forall (m :: * -> *) h. MergingTree m h -> RefCounter m
mergeRefCounter

data MergingTreeState m h =
    CompletedTreeMerge
      !(Ref (Run m h))
      -- ^ Output run

    -- | Reuses MergingRun to allow sharing existing merges.
  | OngoingTreeMerge
      !(Ref (MergingRun MR.TreeMergeType m h))

  | PendingTreeMerge
      !(PendingMerge m h)

-- | A merge that is waiting for its inputs to complete.
data PendingMerge m h =
    -- | The collection of inputs is the entire contents of a table,
    -- i.e. its (merging) runs and finally a union merge (if that table
    -- already contained a union).
    PendingLevelMerge
      !(Vector (PreExistingRun m h))
      !(Maybe (Ref (MergingTree m h)))

    -- | Each input is the entire content of a table (as a merging tree).
  | PendingUnionMerge
      !(Vector (Ref (MergingTree m h)))

pendingContent ::
     PendingMerge m h
  -> ( MR.TreeMergeType
     , Vector (PreExistingRun m h)
     , Vector (Ref (MergingTree m h))
     )
pendingContent :: forall (m :: * -> *) h.
PendingMerge m h
-> (TreeMergeType, Vector (PreExistingRun m h),
    Vector (Ref (MergingTree m h)))
pendingContent = \case
    PendingLevelMerge Vector (PreExistingRun m h)
prs Maybe (Ref (MergingTree m h))
t -> (TreeMergeType
MR.MergeLevel, Vector (PreExistingRun m h)
prs, Vector (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> Vector (Ref (MergingTree m h)))
-> Maybe (Ref (MergingTree m h))
-> Vector (Ref (MergingTree m h))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Vector (Ref (MergingTree m h))
forall a. Vector a
V.empty Ref (MergingTree m h) -> Vector (Ref (MergingTree m h))
forall a. a -> Vector a
V.singleton Maybe (Ref (MergingTree m h))
t)
    PendingUnionMerge    Vector (Ref (MergingTree m h))
ts -> (TreeMergeType
MR.MergeUnion, Vector (PreExistingRun m h)
forall a. Vector a
V.empty, Vector (Ref (MergingTree m h))
ts)

{-# COMPLETE PendingMerge #-}
pattern PendingMerge ::
     MR.TreeMergeType
  -> Vector (PreExistingRun m h)
  -> Vector (Ref (MergingTree m h))
  -> PendingMerge m h
pattern $mPendingMerge :: forall {r} {m :: * -> *} {h}.
PendingMerge m h
-> (TreeMergeType
    -> Vector (PreExistingRun m h)
    -> Vector (Ref (MergingTree m h))
    -> r)
-> ((# #) -> r)
-> r
PendingMerge mt prs ts <- (pendingContent -> (mt, prs, ts))

data PreExistingRun m h =
    PreExistingRun        !(Ref (Run m h))
  | PreExistingMergingRun !(Ref (MergingRun MR.LevelMergeType m h))

{-# SPECIALISE newCompletedMerge ::
     Ref (Run IO h)
  -> IO (Ref (MergingTree IO h)) #-}
newCompletedMerge ::
     (MonadMVar m, PrimMonad m, MonadMask m)
  => Ref (Run m h)
  -> m (Ref (MergingTree m h))
newCompletedMerge :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
Ref (Run m h) -> m (Ref (MergingTree m h))
newCompletedMerge Ref (Run m h)
run = MergingTreeState m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree (MergingTreeState m h -> m (Ref (MergingTree m h)))
-> (Ref (Run m h) -> MergingTreeState m h)
-> Ref (Run m h)
-> m (Ref (MergingTree m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> MergingTreeState m h
forall (m :: * -> *) h. Ref (Run m h) -> MergingTreeState m h
CompletedTreeMerge (Ref (Run m h) -> m (Ref (MergingTree m h)))
-> m (Ref (Run m h)) -> m (Ref (MergingTree m h))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
run

{-# SPECIALISE newOngoingMerge ::
     Ref (MergingRun MR.TreeMergeType IO h)
  -> IO (Ref (MergingTree IO h)) #-}
-- | Create a new 'MergingTree' representing the merge of an ongoing run.
-- The usage of this function is primarily to facilitate the reloading of an
-- ongoing merge from a persistent snapshot.
newOngoingMerge ::
     (MonadMVar m, PrimMonad m, MonadMask m)
  => Ref (MergingRun MR.TreeMergeType m h)
  -> m (Ref (MergingTree m h))
newOngoingMerge :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
Ref (MergingRun TreeMergeType m h) -> m (Ref (MergingTree m h))
newOngoingMerge Ref (MergingRun TreeMergeType m h)
mr = MergingTreeState m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree (MergingTreeState m h -> m (Ref (MergingTree m h)))
-> (Ref (MergingRun TreeMergeType m h) -> MergingTreeState m h)
-> Ref (MergingRun TreeMergeType m h)
-> m (Ref (MergingTree m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (MergingRun TreeMergeType m h) -> MergingTreeState m h
forall (m :: * -> *) h.
Ref (MergingRun TreeMergeType m h) -> MergingTreeState m h
OngoingTreeMerge (Ref (MergingRun TreeMergeType m h) -> m (Ref (MergingTree m h)))
-> m (Ref (MergingRun TreeMergeType m h))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Ref (MergingRun TreeMergeType m h)
-> m (Ref (MergingRun TreeMergeType m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingRun TreeMergeType m h)
mr

{-# SPECIALISE newPendingLevelMerge ::
     [PreExistingRun IO h]
  -> Maybe (Ref (MergingTree IO h))
  -> IO (Ref (MergingTree IO h)) #-}
-- | Create a new 'MergingTree' representing the merge of a sequence of
-- pre-existing runs (completed or ongoing, plus a optional final tree).
-- This is for merging the entire contents of a table down to a single run
-- (while sharing existing ongoing merges).
--
-- Shape: if the list of runs is empty and the optional input tree is
-- structurally empty, the result will also be structurally empty. See
-- 'isStructurallyEmpty'.
--
-- Resource tracking:
-- * This allocates a new 'Ref' which the caller is responsible for releasing
--   eventually.
-- * The ownership of all input 'Ref's remains with the caller. This action
--   will create duplicate references, not adopt the given ones.
--
-- ASYNC: this should be called with asynchronous exceptions masked because it
-- allocates\/creates resources.
newPendingLevelMerge ::
     forall m h.
     (MonadMVar m, MonadMask m, PrimMonad m)
  => [PreExistingRun m h]
  -> Maybe (Ref (MergingTree m h))
  -> m (Ref (MergingTree m h))
newPendingLevelMerge :: forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, PrimMonad m) =>
[PreExistingRun m h]
-> Maybe (Ref (MergingTree m h)) -> m (Ref (MergingTree m h))
newPendingLevelMerge [] (Just Ref (MergingTree m h)
t) = Ref (MergingTree m h) -> m (Ref (MergingTree m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingTree m h)
t
newPendingLevelMerge [PreExistingRun Ref (Run m h)
r] Maybe (Ref (MergingTree m h))
Nothing = do
    -- No need to create a pending merge here.
    --
    -- We could do something similar for PreExistingMergingRun, but it's:
    -- * complicated, because of the LevelMergeType\/TreeMergeType mismatch.
    -- * unneeded, since that case should never occur. If there is only a
    --   single entry in the list, there can only be one level in the input
    --   table. At level 1 there are no merging runs, so it must be a
    --   PreExistingRun.
    Ref (Run m h)
r' <- Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r
    -- There are no interruption points here, and thus provided async
    -- exceptions are masked then there can be no async exceptions here at all.
    MergingTreeState m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree (Ref (Run m h) -> MergingTreeState m h
forall (m :: * -> *) h. Ref (Run m h) -> MergingTreeState m h
CompletedTreeMerge Ref (Run m h)
r')

newPendingLevelMerge [PreExistingRun m h]
prs Maybe (Ref (MergingTree m h))
mmt = do
    -- isStructurallyEmpty is an interruption point, and can receive async
    -- exceptions even when masked. So we use it first, *before* allocating
    -- new references.
    Maybe (Ref (MergingTree m h))
mmt' <- Maybe (Ref (MergingTree m h)) -> m (Maybe (Ref (MergingTree m h)))
dupMaybeMergingTree Maybe (Ref (MergingTree m h))
mmt
    Vector (PreExistingRun m h)
prs' <- (PreExistingRun m h -> m (PreExistingRun m h))
-> Vector (PreExistingRun m h) -> m (Vector (PreExistingRun m h))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Vector a -> f (Vector b)
traverse PreExistingRun m h -> m (PreExistingRun m h)
forall {m :: * -> *} {h}.
(PrimMonad m, MonadThrow m) =>
PreExistingRun m h -> m (PreExistingRun m h)
dupPreExistingRun ([PreExistingRun m h] -> Vector (PreExistingRun m h)
forall a. [a] -> Vector a
V.fromList [PreExistingRun m h]
prs)
    MergingTreeState m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree (PendingMerge m h -> MergingTreeState m h
forall (m :: * -> *) h. PendingMerge m h -> MergingTreeState m h
PendingTreeMerge (Vector (PreExistingRun m h)
-> Maybe (Ref (MergingTree m h)) -> PendingMerge m h
forall (m :: * -> *) h.
Vector (PreExistingRun m h)
-> Maybe (Ref (MergingTree m h)) -> PendingMerge m h
PendingLevelMerge Vector (PreExistingRun m h)
prs' Maybe (Ref (MergingTree m h))
mmt'))
  where
    dupPreExistingRun :: PreExistingRun m h -> m (PreExistingRun m h)
dupPreExistingRun (PreExistingRun Ref (Run m h)
r) =
      Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun (Ref (Run m h) -> PreExistingRun m h)
-> m (Ref (Run m h)) -> m (PreExistingRun m h)
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r
    dupPreExistingRun (PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr) =
      Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h
forall (m :: * -> *) h.
Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h
PreExistingMergingRun (Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h)
-> m (Ref (MergingRun LevelMergeType m h))
-> m (PreExistingRun m h)
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> Ref (MergingRun LevelMergeType m h)
-> m (Ref (MergingRun LevelMergeType m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingRun LevelMergeType m h)
mr

    dupMaybeMergingTree :: Maybe (Ref (MergingTree m h))
                        -> m (Maybe (Ref (MergingTree m h)))
    dupMaybeMergingTree :: Maybe (Ref (MergingTree m h)) -> m (Maybe (Ref (MergingTree m h)))
dupMaybeMergingTree Maybe (Ref (MergingTree m h))
Nothing   = Maybe (Ref (MergingTree m h)) -> m (Maybe (Ref (MergingTree m h)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Ref (MergingTree m h))
forall a. Maybe a
Nothing
    dupMaybeMergingTree (Just Ref (MergingTree m h)
mt) = do
      Bool
isempty <- Ref (MergingTree m h) -> m Bool
forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
isStructurallyEmpty Ref (MergingTree m h)
mt
      if Bool
isempty
        then Maybe (Ref (MergingTree m h)) -> m (Maybe (Ref (MergingTree m h)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Ref (MergingTree m h))
forall a. Maybe a
Nothing
        else Ref (MergingTree m h) -> Maybe (Ref (MergingTree m h))
forall a. a -> Maybe a
Just (Ref (MergingTree m h) -> Maybe (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h)) -> m (Maybe (Ref (MergingTree m h)))
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> Ref (MergingTree m h) -> m (Ref (MergingTree m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingTree m h)
mt

{-# SPECIALISE newPendingUnionMerge ::
     [Ref (MergingTree IO h)]
  -> IO (Ref (MergingTree IO h)) #-}
-- | Create a new 'MergingTree' representing the union of one or more merging
-- trees. This is for unioning the content of multiple tables (represented
-- themselves as merging trees).
--
-- Shape: if all of the input trees are structurally empty, the result will
-- also be structurally empty. See 'isStructurallyEmpty'.
--
-- Resource tracking:
-- * This allocates a new 'Ref' which the caller is responsible for releasing
--   eventually.
-- * The ownership of all input 'Ref's remains with the caller. This action
--   will create duplicate references, not adopt the given ones.
--
-- ASYNC: this should be called with asynchronous exceptions masked because it
-- allocates\/creates resources.
newPendingUnionMerge ::
     (MonadMVar m, MonadMask m, PrimMonad m)
  => [Ref (MergingTree m h)]
  -> m (Ref (MergingTree m h))
newPendingUnionMerge :: forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, PrimMonad m) =>
[Ref (MergingTree m h)] -> m (Ref (MergingTree m h))
newPendingUnionMerge [Ref (MergingTree m h)]
mts = do
    Vector (Ref (MergingTree m h))
mts' <- (Ref (MergingTree m h) -> m Bool)
-> Vector (Ref (MergingTree m h))
-> m (Vector (Ref (MergingTree m h)))
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Vector a -> m (Vector a)
V.filterM ((Bool -> Bool) -> m Bool -> m Bool
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> Bool
not (m Bool -> m Bool)
-> (Ref (MergingTree m h) -> m Bool)
-> Ref (MergingTree m h)
-> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (MergingTree m h) -> m Bool
forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
isStructurallyEmpty) ([Ref (MergingTree m h)] -> Vector (Ref (MergingTree m h))
forall a. [a] -> Vector a
V.fromList [Ref (MergingTree m h)]
mts)
    -- isStructurallyEmpty is interruptible even with async exceptions masked,
    -- but we use it before allocating new references.
    Vector (Ref (MergingTree m h))
mts'' <- (Ref (MergingTree m h) -> m (Ref (MergingTree m h)))
-> Vector (Ref (MergingTree m h))
-> m (Vector (Ref (MergingTree m h)))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM Ref (MergingTree m h) -> m (Ref (MergingTree m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Vector (Ref (MergingTree m h))
mts'
    case Vector (Ref (MergingTree m h))
-> Maybe (Ref (MergingTree m h), Vector (Ref (MergingTree m h)))
forall a. Vector a -> Maybe (a, Vector a)
V.uncons Vector (Ref (MergingTree m h))
mts'' of
      Just (Ref (MergingTree m h)
mt, Vector (Ref (MergingTree m h))
x) | Vector (Ref (MergingTree m h)) -> Bool
forall a. Vector a -> Bool
V.null Vector (Ref (MergingTree m h))
x
        -> Ref (MergingTree m h) -> m (Ref (MergingTree m h))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Ref (MergingTree m h)
mt
      Maybe (Ref (MergingTree m h), Vector (Ref (MergingTree m h)))
_ -> MergingTreeState m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree (PendingMerge m h -> MergingTreeState m h
forall (m :: * -> *) h. PendingMerge m h -> MergingTreeState m h
PendingTreeMerge (Vector (Ref (MergingTree m h)) -> PendingMerge m h
forall (m :: * -> *) h.
Vector (Ref (MergingTree m h)) -> PendingMerge m h
PendingUnionMerge Vector (Ref (MergingTree m h))
mts''))

{-# SPECIALISE isStructurallyEmpty :: Ref (MergingTree IO h) -> IO Bool #-}
-- | Test if a 'MergingTree' is \"obviously\" empty by virtue of its structure.
-- This is not the same as being empty due to a pending or ongoing merge
-- happening to produce an empty run.
--
isStructurallyEmpty :: MonadMVar m => Ref (MergingTree m h) -> m Bool
isStructurallyEmpty :: forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
isStructurallyEmpty (DeRef MergingTree {StrictMVar m (MergingTreeState m h)
mergeState :: forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeState :: StrictMVar m (MergingTreeState m h)
mergeState}) =
    MergingTreeState m h -> Bool
forall (m :: * -> *) h. MergingTreeState m h -> Bool
isStructurallyEmptyState (MergingTreeState m h -> Bool)
-> m (MergingTreeState m h) -> m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictMVar m (MergingTreeState m h) -> m (MergingTreeState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar StrictMVar m (MergingTreeState m h)
mergeState

isStructurallyEmptyState :: MergingTreeState m h -> Bool
isStructurallyEmptyState :: forall (m :: * -> *) h. MergingTreeState m h -> Bool
isStructurallyEmptyState = \case
    PendingTreeMerge (PendingLevelMerge Vector (PreExistingRun m h)
prs Maybe (Ref (MergingTree m h))
Nothing) -> Vector (PreExistingRun m h) -> Bool
forall a. Vector a -> Bool
V.null Vector (PreExistingRun m h)
prs
    PendingTreeMerge (PendingUnionMerge Vector (Ref (MergingTree m h))
mts)         -> Vector (Ref (MergingTree m h)) -> Bool
forall a. Vector a -> Bool
V.null Vector (Ref (MergingTree m h))
mts
    MergingTreeState m h
_                                                -> Bool
False
    -- It may also turn out to be useful to consider CompletedTreeMerge with
    -- a zero length runs as empty.

{-# SPECIALISE mkMergingTree ::
     MergingTreeState IO h
  -> IO (Ref (MergingTree IO h)) #-}
-- | Constructor helper.
--
-- This adopts the references in the MergingTreeState, so callers should
-- duplicate first. This is not the normal pattern, but this is an internal
-- helper only.
--
mkMergingTree ::
     (MonadMVar m, PrimMonad m, MonadMask m)
  => MergingTreeState m h
  -> m (Ref (MergingTree m h))
mkMergingTree :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree MergingTreeState m h
mergeTreeState = do
    StrictMVar m (MergingTreeState m h)
mergeState <- MergingTreeState m h -> m (StrictMVar m (MergingTreeState m h))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar MergingTreeState m h
mergeTreeState
    m ()
-> (RefCounter m -> MergingTree m h) -> m (Ref (MergingTree m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, HasCallStack) =>
m () -> (RefCounter m -> obj) -> m (Ref obj)
newRef (StrictMVar m (MergingTreeState m h) -> m ()
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
StrictMVar m (MergingTreeState m h) -> m ()
finalise StrictMVar m (MergingTreeState m h)
mergeState) ((RefCounter m -> MergingTree m h) -> m (Ref (MergingTree m h)))
-> (RefCounter m -> MergingTree m h) -> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \RefCounter m
mergeRefCounter ->
      MergingTree {
        StrictMVar m (MergingTreeState m h)
mergeState :: StrictMVar m (MergingTreeState m h)
mergeState :: StrictMVar m (MergingTreeState m h)
mergeState
      , RefCounter m
mergeRefCounter :: RefCounter m
mergeRefCounter :: RefCounter m
mergeRefCounter
      }

{-# SPECIALISE finalise :: StrictMVar IO (MergingTreeState IO h) -> IO () #-}
finalise :: (MonadMVar m, PrimMonad m, MonadMask m)
         => StrictMVar m (MergingTreeState m h) -> m ()
finalise :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
StrictMVar m (MergingTreeState m h) -> m ()
finalise StrictMVar m (MergingTreeState m h)
mergeState = MergingTreeState m h -> m ()
forall {m :: * -> *} {h}.
(PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m ()
releaseMTS (MergingTreeState m h -> m ()) -> m (MergingTreeState m h) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StrictMVar m (MergingTreeState m h) -> m (MergingTreeState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar StrictMVar m (MergingTreeState m h)
mergeState
  where
    releaseMTS :: MergingTreeState m h -> m ()
releaseMTS (CompletedTreeMerge Ref (Run m h)
r) = Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r
    releaseMTS (OngoingTreeMerge  Ref (MergingRun TreeMergeType m h)
mr) = Ref (MergingRun TreeMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun TreeMergeType m h)
mr
    releaseMTS (PendingTreeMerge PendingMerge m h
ptm) =
      case PendingMerge m h
ptm of
        PendingUnionMerge Vector (Ref (MergingTree m h))
mts        -> (Ref (MergingTree m h) -> m ())
-> Vector (Ref (MergingTree m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Vector (Ref (MergingTree m h))
mts
        PendingLevelMerge Vector (PreExistingRun m h)
prs Maybe (Ref (MergingTree m h))
mmt    -> (PreExistingRun m h -> m ()) -> Vector (PreExistingRun m h) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ PreExistingRun m h -> m ()
forall {m :: * -> *} {h}.
(PrimMonad m, MonadMask m) =>
PreExistingRun m h -> m ()
releasePER Vector (PreExistingRun m h)
prs
                                     m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Ref (MergingTree m h) -> m ())
-> Maybe (Ref (MergingTree m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Maybe (Ref (MergingTree m h))
mmt

    releasePER :: PreExistingRun m h -> m ()
releasePER (PreExistingRun         Ref (Run m h)
r) = Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r
    releasePER (PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr) = Ref (MergingRun LevelMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun LevelMergeType m h)
mr

{-# SPECIALISE remainingMergeDebt ::
     Ref (MergingTree IO h) -> IO (MergeDebt, NumEntries) #-}
-- | Calculate an upper bound on the merge credits required to complete the
-- merge, i.e. turn the tree into a 'CompletedTreeMerge'. For the recursive
-- calculation, we also return an upper bound on the size of the resulting run.
remainingMergeDebt ::
     (MonadMVar m, PrimMonad m)
  => Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt (DeRef MergingTree m h
mt) = do
    StrictMVar m (MergingTreeState m h) -> m (MergingTreeState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar (MergingTree m h -> StrictMVar m (MergingTreeState m h)
forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeState MergingTree m h
mt) m (MergingTreeState m h)
-> (MergingTreeState m h -> m (MergeDebt, NumEntries))
-> m (MergeDebt, NumEntries)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      CompletedTreeMerge Ref (Run m h)
r -> (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergeCredits -> MergeDebt
MergeDebt MergeCredits
0, Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r)
      OngoingTreeMerge Ref (MergingRun TreeMergeType m h)
mr  -> (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
forall {b}. (MergeDebt, b) -> (MergeDebt, b)
addDebtOne ((MergeDebt, NumEntries) -> (MergeDebt, NumEntries))
-> m (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ref (MergingRun TreeMergeType m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) t h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingRun t m h) -> m (MergeDebt, NumEntries)
MR.remainingMergeDebt Ref (MergingRun TreeMergeType m h)
mr
      PendingTreeMerge PendingMerge m h
ptm -> (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
forall {b}. (MergeDebt, b) -> (MergeDebt, b)
addDebtOne ((MergeDebt, NumEntries) -> (MergeDebt, NumEntries))
-> m (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> PendingMerge m h -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
PendingMerge m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPendingMerge PendingMerge m h
ptm
  where
    -- An ongoing merge should never have 0 debt, even if the 'MergingRun' in it
    -- says it is completed. We still need to update it to 'CompletedTreeMerge'.
    -- Similarly, a pending merge needs some work to complete it, even if all
    -- its inputs are empty.
    --
    -- Note that we can't use @max 1@, as this would violate the property that
    -- supplying N credits reduces the remaining debt by at least N.
    addDebtOne :: (MergeDebt, b) -> (MergeDebt, b)
addDebtOne (MergeDebt !MergeCredits
debt, !b
size) = (MergeCredits -> MergeDebt
MergeDebt (MergeCredits
debt MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
1), b
size)

{-# SPECIALISE remainingMergeDebtPendingMerge ::
     PendingMerge IO h -> IO (MergeDebt, NumEntries) #-}
remainingMergeDebtPendingMerge ::
     (MonadMVar m, PrimMonad m)
  => PendingMerge m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPendingMerge :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
PendingMerge m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPendingMerge (PendingMerge TreeMergeType
_ Vector (PreExistingRun m h)
prs Vector (Ref (MergingTree m h))
mts) = do
    -- TODO: optimise to reduce allocations
    Vector (MergeDebt, NumEntries)
debtsPre <- (PreExistingRun m h -> m (MergeDebt, NumEntries))
-> Vector (PreExistingRun m h)
-> m (Vector (MergeDebt, NumEntries))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Vector a -> f (Vector b)
traverse PreExistingRun m h -> m (MergeDebt, NumEntries)
forall {h}. PreExistingRun m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPreExistingRun Vector (PreExistingRun m h)
prs
    Vector (MergeDebt, NumEntries)
debtsChild <- (Ref (MergingTree m h) -> m (MergeDebt, NumEntries))
-> Vector (Ref (MergingTree m h))
-> m (Vector (MergeDebt, NumEntries))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Vector a -> f (Vector b)
traverse Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt Vector (Ref (MergingTree m h))
mts
    (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Vector (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
debtOfNestedMerge (Vector (MergeDebt, NumEntries)
debtsPre Vector (MergeDebt, NumEntries)
-> Vector (MergeDebt, NumEntries) -> Vector (MergeDebt, NumEntries)
forall a. Semigroup a => a -> a -> a
<> Vector (MergeDebt, NumEntries)
debtsChild))
  where
    remainingMergeDebtPreExistingRun :: PreExistingRun m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPreExistingRun = \case
        PreExistingRun        Ref (Run m h)
r  -> (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergeCredits -> MergeDebt
MergeDebt MergeCredits
0, Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r)
        PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr -> Ref (MergingRun LevelMergeType m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) t h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingRun t m h) -> m (MergeDebt, NumEntries)
MR.remainingMergeDebt Ref (MergingRun LevelMergeType m h)
mr

debtOfNestedMerge :: Vector (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
debtOfNestedMerge :: Vector (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
debtOfNestedMerge Vector (MergeDebt, NumEntries)
debts =
    -- complete all children, then one merge of them all (so debt is their size)
    (MergeCredits -> MergeDebt
MergeDebt (MergeCredits
c MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ Int -> MergeCredits
MR.MergeCredits Int
n), Int -> NumEntries
NumEntries Int
n)
  where
    (MergeDebt MergeCredits
c, NumEntries Int
n) = ((MergeDebt, NumEntries)
 -> (MergeDebt, NumEntries) -> (MergeDebt, NumEntries))
-> (MergeDebt, NumEntries)
-> Vector (MergeDebt, NumEntries)
-> (MergeDebt, NumEntries)
forall b a. (b -> a -> b) -> b -> Vector a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (MergeDebt, NumEntries)
-> (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
add (MergeCredits -> MergeDebt
MergeDebt MergeCredits
0, Int -> NumEntries
NumEntries Int
0) Vector (MergeDebt, NumEntries)
debts

    add :: (MergeDebt, NumEntries)
-> (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
add (MergeDebt !MergeCredits
d1, NumEntries !Int
n1) (MergeDebt !MergeCredits
d2, NumEntries !Int
n2) =
        (MergeCredits -> MergeDebt
MergeDebt (MergeCredits
d1 MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
d2), Int -> NumEntries
NumEntries (Int
n1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
n2))

{-# SPECIALISE supplyCredits ::
     HasFS IO h
  -> HasBlockIO IO h
  -> ResolveSerialisedValue
  -> Run.RunParams
  -> MR.CreditThreshold
  -> SessionRoot
  -> UniqCounter IO
  -> Ref (MergingTree IO h)
  -> MR.MergeCredits
  -> IO MR.MergeCredits #-}
supplyCredits ::
     forall m h.
     (MonadMVar m, MonadST m, MonadSTM m, MonadMask m)
  => HasFS m h
  -> HasBlockIO m h
  -> ResolveSerialisedValue
  -> Run.RunParams
  -> MR.CreditThreshold
  -> SessionRoot
  -> UniqCounter m
  -> Ref (MergingTree m h)
  -> MR.MergeCredits
  -> m MR.MergeCredits
supplyCredits :: forall (m :: * -> *) h.
(MonadMVar m, MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> CreditThreshold
-> SessionRoot
-> UniqCounter m
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
supplyCredits HasFS m h
hfs HasBlockIO m h
hbio ResolveSerialisedValue
resolve RunParams
runParams CreditThreshold
threshold SessionRoot
root UniqCounter m
uc = \Ref (MergingTree m h)
mt0 MergeCredits
c0 -> do
    if MergeCredits
c0 MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
0
      then MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MergeCredits
0
      else Ref (MergingTree m h) -> MergeCredits -> m MergeCredits
supplyTree Ref (MergingTree m h)
mt0 MergeCredits
c0
  where
    mkFreshRunPaths :: m RunFsPaths
mkFreshRunPaths = do
        RunNumber
runNumber <- Unique -> RunNumber
uniqueToRunNumber (Unique -> RunNumber) -> m Unique -> m RunNumber
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter UniqCounter m
uc
        RunFsPaths -> m RunFsPaths
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ActiveDir -> RunNumber -> RunFsPaths
Paths.runPath (SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root) RunNumber
runNumber)

    supplyTree :: Ref (MergingTree m h) -> MergeCredits -> m MergeCredits
supplyTree =
        (Ref (MergingTree m h) -> m (MergeDebt, NumEntries))
-> (Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
forall (m :: * -> *) r s.
(HasCallStack, Monad m) =>
(r -> m (MergeDebt, s))
-> (r -> MergeCredits -> m MergeCredits)
-> r
-> MergeCredits
-> m MergeCredits
MR.supplyChecked Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt ((Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
 -> Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> (Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
forall a b. (a -> b) -> a -> b
$ \(DeRef MergingTree m h
mt) MergeCredits
credits ->
          -- TODO: This locks the tree for everyone, for the entire call.
          -- Lookups have to wait until supplyCredits is done.
          -- It should be enough to take the lock only to turn a pending into
          -- an ongoing or ongoing into completed tree, very briefly.
          m (MergingTreeState m h)
-> (MergingTreeState m h -> m ())
-> (ActionRegistry m
    -> MergingTreeState m h -> m (MergingTreeState m h, MergeCredits))
-> m MergeCredits
forall (m :: * -> *) st a.
(PrimMonad m, MonadCatch m) =>
m st
-> (st -> m ()) -> (ActionRegistry m -> st -> m (st, a)) -> m a
modifyWithActionRegistry
            (StrictMVar m (MergingTreeState m h) -> m (MergingTreeState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
takeMVar (MergingTree m h -> StrictMVar m (MergingTreeState m h)
forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeState MergingTree m h
mt))
            (StrictMVar m (MergingTreeState m h) -> MergingTreeState m h -> m ()
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m ()
putMVar (MergingTree m h -> StrictMVar m (MergingTreeState m h)
forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeState MergingTree m h
mt))
            (\ActionRegistry m
reg MergingTreeState m h
state -> ActionRegistry m
-> MergingTreeState m h
-> MergeCredits
-> m (MergingTreeState m h, MergeCredits)
supplyState ActionRegistry m
reg MergingTreeState m h
state MergeCredits
credits)

    supplyState :: ActionRegistry m
-> MergingTreeState m h
-> MergeCredits
-> m (MergingTreeState m h, MergeCredits)
supplyState ActionRegistry m
reg MergingTreeState m h
state MergeCredits
credits =
        case MergingTreeState m h
state of
          CompletedTreeMerge Ref (Run m h)
_ ->
            (MergingTreeState m h, MergeCredits)
-> m (MergingTreeState m h, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergingTreeState m h
state, MergeCredits
credits)

          OngoingTreeMerge Ref (MergingRun TreeMergeType m h)
mr -> do
            MergeCredits
leftovers <- Ref (MergingRun TreeMergeType m h)
-> CreditThreshold -> MergeCredits -> m MergeCredits
forall t (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h)
-> CreditThreshold -> MergeCredits -> m MergeCredits
MR.supplyCreditsRelative Ref (MergingRun TreeMergeType m h)
mr CreditThreshold
threshold MergeCredits
credits
            if MergeCredits
leftovers MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
0
              then
                (MergingTreeState m h, MergeCredits)
-> m (MergingTreeState m h, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergingTreeState m h
state, MergeCredits
0)
              else do
                -- complete ongoing merge
                Ref (Run m h)
r <- ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingRun TreeMergeType m h) -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Ref (Run m h))
MR.expectCompleted Ref (MergingRun TreeMergeType m h)
mr) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
                ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingRun TreeMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun TreeMergeType m h)
mr)
                -- all work is done, we can't spend any more credits
                (MergingTreeState m h, MergeCredits)
-> m (MergingTreeState m h, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Ref (Run m h) -> MergingTreeState m h
forall (m :: * -> *) h. Ref (Run m h) -> MergingTreeState m h
CompletedTreeMerge Ref (Run m h)
r, MergeCredits
leftovers)

          PendingTreeMerge PendingMerge m h
_
            | MergingTreeState m h -> Bool
forall (m :: * -> *) h. MergingTreeState m h -> Bool
isStructurallyEmptyState MergingTreeState m h
state -> do
            -- make a completely fresh empty run. this can only happen at the
            -- root. the structurally empty tree still has debt 1, so we want to
            -- merge it into a single run.
            -- we handle this as a special case here since in several places
            -- below we require the list of children to be non-empty.
            RunFsPaths
runPaths <- m RunFsPaths
mkFreshRunPaths
            Ref (Run m h)
run <-
              ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
                -- TODO: the builder's handles aren't cleaned up if we fail
                -- before fromBuilder closes them
                (HasFS m h
-> HasBlockIO m h -> RunParams -> RunFsPaths -> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h -> RunParams -> RunFsPaths -> m (Ref (Run m h))
Run.newEmpty HasFS m h
hfs HasBlockIO m h
hbio RunParams
runParams RunFsPaths
runPaths)
                Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
            (MergingTreeState m h, MergeCredits)
-> m (MergingTreeState m h, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Ref (Run m h) -> MergingTreeState m h
forall (m :: * -> *) h. Ref (Run m h) -> MergingTreeState m h
CompletedTreeMerge Ref (Run m h)
run, MergeCredits
credits)

          PendingTreeMerge PendingMerge m h
pm -> do
            MergeCredits
leftovers <- PendingMerge m h -> MergeCredits -> m MergeCredits
supplyPending PendingMerge m h
pm MergeCredits
credits
            if MergeCredits
leftovers MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
0
              then
                -- still remaining work in children, we can't do more for now
                (MergingTreeState m h, MergeCredits)
-> m (MergingTreeState m h, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergingTreeState m h
state, MergeCredits
leftovers)
              else do
                -- all children must be done, create new merge!
                MergingTreeState m h
state' <- ActionRegistry m -> PendingMerge m h -> m (MergingTreeState m h)
startPendingMerge ActionRegistry m
reg PendingMerge m h
pm
                -- use any remaining credits to progress the new merge
                ActionRegistry m
-> MergingTreeState m h
-> MergeCredits
-> m (MergingTreeState m h, MergeCredits)
supplyState ActionRegistry m
reg MergingTreeState m h
state' MergeCredits
leftovers

    supplyPending ::
         PendingMerge m h -> MR.MergeCredits -> m MR.MergeCredits
    supplyPending :: PendingMerge m h -> MergeCredits -> m MergeCredits
supplyPending =
        (PendingMerge m h -> m (MergeDebt, NumEntries))
-> (PendingMerge m h -> MergeCredits -> m MergeCredits)
-> PendingMerge m h
-> MergeCredits
-> m MergeCredits
forall (m :: * -> *) r s.
(HasCallStack, Monad m) =>
(r -> m (MergeDebt, s))
-> (r -> MergeCredits -> m MergeCredits)
-> r
-> MergeCredits
-> m MergeCredits
MR.supplyChecked PendingMerge m h -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
PendingMerge m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPendingMerge ((PendingMerge m h -> MergeCredits -> m MergeCredits)
 -> PendingMerge m h -> MergeCredits -> m MergeCredits)
-> (PendingMerge m h -> MergeCredits -> m MergeCredits)
-> PendingMerge m h
-> MergeCredits
-> m MergeCredits
forall a b. (a -> b) -> a -> b
$ \PendingMerge m h
pm MergeCredits
credits -> do
          case PendingMerge m h
pm of
            PendingLevelMerge Vector (PreExistingRun m h)
prs Maybe (Ref (MergingTree m h))
mt ->
              (MergeCredits -> PreExistingRun m h -> m MergeCredits)
-> [PreExistingRun m h] -> MergeCredits -> m MergeCredits
forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
leftToRight MergeCredits -> PreExistingRun m h -> m MergeCredits
supplyPreExisting (Vector (PreExistingRun m h) -> [PreExistingRun m h]
forall a. Vector a -> [a]
V.toList Vector (PreExistingRun m h)
prs) MergeCredits
credits
                m MergeCredits
-> (MergeCredits -> m MergeCredits) -> m MergeCredits
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MergeCredits -> Ref (MergingTree m h) -> m MergeCredits)
-> [Ref (MergingTree m h)] -> MergeCredits -> m MergeCredits
forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
leftToRight ((Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> MergeCredits -> Ref (MergingTree m h) -> m MergeCredits
forall a b c. (a -> b -> c) -> b -> a -> c
flip Ref (MergingTree m h) -> MergeCredits -> m MergeCredits
supplyTree) (Maybe (Ref (MergingTree m h)) -> [Ref (MergingTree m h)]
forall a. Maybe a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Maybe (Ref (MergingTree m h))
mt)
            PendingUnionMerge Vector (Ref (MergingTree m h))
mts ->
              (MergeCredits -> Ref (MergingTree m h) -> m MergeCredits)
-> [Ref (MergingTree m h)] -> MergeCredits -> m MergeCredits
forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
splitEqually ((Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> MergeCredits -> Ref (MergingTree m h) -> m MergeCredits
forall a b c. (a -> b -> c) -> b -> a -> c
flip Ref (MergingTree m h) -> MergeCredits -> m MergeCredits
supplyTree) (Vector (Ref (MergingTree m h)) -> [Ref (MergingTree m h)]
forall a. Vector a -> [a]
V.toList Vector (Ref (MergingTree m h))
mts) MergeCredits
credits

    supplyPreExisting :: MergeCredits -> PreExistingRun m h -> m MergeCredits
supplyPreExisting MergeCredits
c = \case
        PreExistingRun Ref (Run m h)
_r        -> MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MergeCredits
c  -- no work to do, all leftovers
        PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr -> Ref (MergingRun LevelMergeType m h)
-> CreditThreshold -> MergeCredits -> m MergeCredits
forall t (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h)
-> CreditThreshold -> MergeCredits -> m MergeCredits
MR.supplyCreditsRelative Ref (MergingRun LevelMergeType m h)
mr CreditThreshold
threshold MergeCredits
c

    -- supply credits left to right until they are used up
    leftToRight ::
         (MR.MergeCredits -> a -> m MR.MergeCredits)
      -> [a] -> MR.MergeCredits -> m MR.MergeCredits
    leftToRight :: forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
leftToRight MergeCredits -> a -> m MergeCredits
_ [a]
_      MergeCredits
0 = MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MergeCredits
0
    leftToRight MergeCredits -> a -> m MergeCredits
_ []     MergeCredits
c = MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MergeCredits
c
    leftToRight MergeCredits -> a -> m MergeCredits
f (a
x:[a]
xs) MergeCredits
c = MergeCredits -> a -> m MergeCredits
f MergeCredits
c a
x m MergeCredits
-> (MergeCredits -> m MergeCredits) -> m MergeCredits
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
leftToRight MergeCredits -> a -> m MergeCredits
f [a]
xs

    -- approximately equal, being more precise would require more iterations
    splitEqually ::
         (MR.MergeCredits -> a -> m MR.MergeCredits)
      -> [a] -> MR.MergeCredits -> m MR.MergeCredits
    splitEqually :: forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
splitEqually MergeCredits -> a -> m MergeCredits
f [a]
xs (MR.MergeCredits Int
credits) =
        -- first give each tree k = ceil(1/n) credits (last ones might get less).
        -- it's important we fold here to collect leftovers.
        -- any remainders go left to right.
        (MergeCredits -> a -> m MergeCredits)
-> MergeCredits -> [a] -> m MergeCredits
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM MergeCredits -> a -> m MergeCredits
supplyNth (Int -> MergeCredits
MR.MergeCredits Int
credits) [a]
xs m MergeCredits
-> (MergeCredits -> m MergeCredits) -> m MergeCredits
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
leftToRight MergeCredits -> a -> m MergeCredits
f [a]
xs
      where
        !n :: Int
n = [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
xs
        !k :: MergeCredits
k = Int -> MergeCredits
MR.MergeCredits ((Int
credits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
n)

        supplyNth :: MergeCredits -> a -> m MergeCredits
supplyNth MergeCredits
0 a
_ = MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MergeCredits
0
        supplyNth MergeCredits
c a
t = do
            let creditsToSpend :: MergeCredits
creditsToSpend = MergeCredits -> MergeCredits -> MergeCredits
forall a. Ord a => a -> a -> a
min MergeCredits
k MergeCredits
c
            MergeCredits
leftovers <- MergeCredits -> a -> m MergeCredits
f MergeCredits
creditsToSpend a
t
            MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergeCredits
c MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
- MergeCredits
creditsToSpend MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
leftovers)

    startPendingMerge :: ActionRegistry m -> PendingMerge m h -> m (MergingTreeState m h)
startPendingMerge ActionRegistry m
reg PendingMerge m h
pm = do
        (TreeMergeType
mergeType, Vector (Ref (Run m h))
rs) <- ActionRegistry m
-> PendingMerge m h -> m (TreeMergeType, Vector (Ref (Run m h)))
expectCompletedChildren ActionRegistry m
reg PendingMerge m h
pm
        Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Int
forall a. Vector a -> Int
V.length Vector (Ref (Run m h))
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        RunFsPaths
runPaths <- m RunFsPaths
mkFreshRunPaths
        Ref (MergingRun TreeMergeType m h)
mr <-
          ActionRegistry m
-> m (Ref (MergingRun TreeMergeType m h))
-> (Ref (MergingRun TreeMergeType m h) -> m ())
-> m (Ref (MergingRun TreeMergeType m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
            (HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> TreeMergeType
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Ref (MergingRun TreeMergeType m h))
forall t (m :: * -> *) h.
(IsMergeType t, MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> t
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Ref (MergingRun t m h))
MR.new HasFS m h
hfs HasBlockIO m h
hbio ResolveSerialisedValue
resolve RunParams
runParams TreeMergeType
mergeType RunFsPaths
runPaths Vector (Ref (Run m h))
rs)
            Ref (MergingRun TreeMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
        -- no need for the runs anymore, 'MR.new' made duplicates
        (Ref (Run m h) -> m ()) -> Vector (Ref (Run m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (\Ref (Run m h)
r -> ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r)) Vector (Ref (Run m h))
rs
        MergingTreeState m h -> m (MergingTreeState m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Ref (MergingRun TreeMergeType m h) -> MergingTreeState m h
forall (m :: * -> *) h.
Ref (MergingRun TreeMergeType m h) -> MergingTreeState m h
OngoingTreeMerge Ref (MergingRun TreeMergeType m h)
mr)

    -- Child references are released using 'delayedCommit', so they get released
    -- if the whole supply operation runs successfully (so the pending merge
    -- is replaced).
    --
    -- Returned references are registered in the ActionRegistry, so they will
    -- get released in case of an exception.
    expectCompletedChildren ::
         ActionRegistry m
      -> PendingMerge m h
      -> m (MR.TreeMergeType, Vector (Ref (Run m h)))
    expectCompletedChildren :: ActionRegistry m
-> PendingMerge m h -> m (TreeMergeType, Vector (Ref (Run m h)))
expectCompletedChildren ActionRegistry m
reg (PendingMerge TreeMergeType
ty Vector (PreExistingRun m h)
prs Vector (Ref (MergingTree m h))
mts) = do
        Vector (Ref (Run m h))
rs1 <- Vector (PreExistingRun m h)
-> (PreExistingRun m h -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
V.forM Vector (PreExistingRun m h)
prs ((PreExistingRun m h -> m (Ref (Run m h)))
 -> m (Vector (Ref (Run m h))))
-> (PreExistingRun m h -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \case
          PreExistingRun Ref (Run m h)
r -> do
            ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r)  -- only released at the end
            ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
          PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr -> do
            ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingRun LevelMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun LevelMergeType m h)
mr)  -- only released at the end
            ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingRun LevelMergeType m h) -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Ref (Run m h))
MR.expectCompleted Ref (MergingRun LevelMergeType m h)
mr) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
        Vector (Ref (Run m h))
rs2 <- Vector (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
V.forM Vector (Ref (MergingTree m h))
mts ((Ref (MergingTree m h) -> m (Ref (Run m h)))
 -> m (Vector (Ref (Run m h))))
-> (Ref (MergingTree m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \Ref (MergingTree m h)
mt -> do
          ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingTree m h)
mt)  -- only released at the end
          ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingTree m h) -> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingTree m h) -> m (Ref (Run m h))
expectCompleted Ref (MergingTree m h)
mt) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
        (TreeMergeType, Vector (Ref (Run m h)))
-> m (TreeMergeType, Vector (Ref (Run m h)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TreeMergeType
ty, Vector (Ref (Run m h))
rs1 Vector (Ref (Run m h))
-> Vector (Ref (Run m h)) -> Vector (Ref (Run m h))
forall a. Semigroup a => a -> a -> a
<> Vector (Ref (Run m h))
rs2)

-- | This does /not/ release the reference, but allocates a new reference for
-- the returned run, which must be released at some point.
expectCompleted ::
     (MonadMVar m, MonadSTM m, MonadST m, MonadMask m)
  => Ref (MergingTree m h) -> m (Ref (Run m h))
expectCompleted :: forall (m :: * -> *) h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingTree m h) -> m (Ref (Run m h))
expectCompleted (DeRef MergingTree {StrictMVar m (MergingTreeState m h)
RefCounter m
mergeState :: forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeRefCounter :: forall (m :: * -> *) h. MergingTree m h -> RefCounter m
mergeState :: StrictMVar m (MergingTreeState m h)
mergeRefCounter :: RefCounter m
..}) = do
    StrictMVar m (MergingTreeState m h)
-> (MergingTreeState m h -> m (Ref (Run m h))) -> m (Ref (Run m h))
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m b) -> m b
withMVar StrictMVar m (MergingTreeState m h)
mergeState ((MergingTreeState m h -> m (Ref (Run m h))) -> m (Ref (Run m h)))
-> (MergingTreeState m h -> m (Ref (Run m h))) -> m (Ref (Run m h))
forall a b. (a -> b) -> a -> b
$ \case
      CompletedTreeMerge Ref (Run m h)
r -> Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r  -- return a fresh reference to the run
      OngoingTreeMerge Ref (MergingRun TreeMergeType m h)
mr  -> Ref (MergingRun TreeMergeType m h) -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Ref (Run m h))
MR.expectCompleted Ref (MergingRun TreeMergeType m h)
mr
      PendingTreeMerge{}   ->
        [Char] -> m (Ref (Run m h))
forall a. HasCallStack => [Char] -> a
error [Char]
"expectCompleted: expected a completed merging tree, but found a pending one"