{-# LANGUAGE CPP #-}
{-# LANGUAGE PatternSynonyms #-}
{-# OPTIONS_HADDOCK not-home #-}
module Database.LSMTree.Internal.MergingTree (
MergingTree (..)
, PreExistingRun (..)
, newCompletedMerge
, newOngoingMerge
, newPendingLevelMerge
, newPendingUnionMerge
, isStructurallyEmpty
, remainingMergeDebt
, supplyCredits
, MergingTreeState (..)
, PendingMerge (..)
) where
import Control.ActionRegistry
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Exception (assert)
import Control.Monad (foldM, (<$!>))
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadSTM (MonadSTM (..))
import Control.Monad.Class.MonadThrow (MonadMask)
import Control.Monad.Primitive
import Control.RefCount
import Data.Foldable (toList, traverse_)
#if !MIN_VERSION_base(4,20,0)
import Data.List (foldl')
#endif
import Data.Vector (Vector)
import qualified Data.Vector as V
import Database.LSMTree.Internal.Entry (NumEntries (..))
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import Database.LSMTree.Internal.MergingRun (MergeDebt (..),
MergingRun)
import qualified Database.LSMTree.Internal.MergingRun as MR
import Database.LSMTree.Internal.Paths (SessionRoot)
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.UniqCounter
import System.FS.API (HasFS)
import System.FS.BlockIO.API (HasBlockIO)
data MergingTree m h = MergingTree {
forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeState :: !(StrictMVar m (MergingTreeState m h))
, forall (m :: * -> *) h. MergingTree m h -> RefCounter m
mergeRefCounter :: !(RefCounter m)
}
instance RefCounted m (MergingTree m h) where
getRefCounter :: MergingTree m h -> RefCounter m
getRefCounter = MergingTree m h -> RefCounter m
forall (m :: * -> *) h. MergingTree m h -> RefCounter m
mergeRefCounter
data MergingTreeState m h =
CompletedTreeMerge
!(Ref (Run m h))
| OngoingTreeMerge
!(Ref (MergingRun MR.TreeMergeType m h))
| PendingTreeMerge
!(PendingMerge m h)
data PendingMerge m h =
PendingLevelMerge
!(Vector (PreExistingRun m h))
!(Maybe (Ref (MergingTree m h)))
| PendingUnionMerge
!(Vector (Ref (MergingTree m h)))
pendingContent ::
PendingMerge m h
-> ( MR.TreeMergeType
, Vector (PreExistingRun m h)
, Vector (Ref (MergingTree m h))
)
pendingContent :: forall (m :: * -> *) h.
PendingMerge m h
-> (TreeMergeType, Vector (PreExistingRun m h),
Vector (Ref (MergingTree m h)))
pendingContent = \case
PendingLevelMerge Vector (PreExistingRun m h)
prs Maybe (Ref (MergingTree m h))
t -> (TreeMergeType
MR.MergeLevel, Vector (PreExistingRun m h)
prs, Vector (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> Vector (Ref (MergingTree m h)))
-> Maybe (Ref (MergingTree m h))
-> Vector (Ref (MergingTree m h))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Vector (Ref (MergingTree m h))
forall a. Vector a
V.empty Ref (MergingTree m h) -> Vector (Ref (MergingTree m h))
forall a. a -> Vector a
V.singleton Maybe (Ref (MergingTree m h))
t)
PendingUnionMerge Vector (Ref (MergingTree m h))
ts -> (TreeMergeType
MR.MergeUnion, Vector (PreExistingRun m h)
forall a. Vector a
V.empty, Vector (Ref (MergingTree m h))
ts)
{-# COMPLETE PendingMerge #-}
pattern PendingMerge ::
MR.TreeMergeType
-> Vector (PreExistingRun m h)
-> Vector (Ref (MergingTree m h))
-> PendingMerge m h
pattern $mPendingMerge :: forall {r} {m :: * -> *} {h}.
PendingMerge m h
-> (TreeMergeType
-> Vector (PreExistingRun m h)
-> Vector (Ref (MergingTree m h))
-> r)
-> ((# #) -> r)
-> r
PendingMerge mt prs ts <- (pendingContent -> (mt, prs, ts))
data PreExistingRun m h =
PreExistingRun !(Ref (Run m h))
| PreExistingMergingRun !(Ref (MergingRun MR.LevelMergeType m h))
{-# SPECIALISE newCompletedMerge ::
Ref (Run IO h)
-> IO (Ref (MergingTree IO h)) #-}
newCompletedMerge ::
(MonadMVar m, PrimMonad m, MonadMask m)
=> Ref (Run m h)
-> m (Ref (MergingTree m h))
newCompletedMerge :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
Ref (Run m h) -> m (Ref (MergingTree m h))
newCompletedMerge Ref (Run m h)
run = MergingTreeState m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree (MergingTreeState m h -> m (Ref (MergingTree m h)))
-> (Ref (Run m h) -> MergingTreeState m h)
-> Ref (Run m h)
-> m (Ref (MergingTree m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> MergingTreeState m h
forall (m :: * -> *) h. Ref (Run m h) -> MergingTreeState m h
CompletedTreeMerge (Ref (Run m h) -> m (Ref (MergingTree m h)))
-> m (Ref (Run m h)) -> m (Ref (MergingTree m h))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
run
{-# SPECIALISE newOngoingMerge ::
Ref (MergingRun MR.TreeMergeType IO h)
-> IO (Ref (MergingTree IO h)) #-}
newOngoingMerge ::
(MonadMVar m, PrimMonad m, MonadMask m)
=> Ref (MergingRun MR.TreeMergeType m h)
-> m (Ref (MergingTree m h))
newOngoingMerge :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
Ref (MergingRun TreeMergeType m h) -> m (Ref (MergingTree m h))
newOngoingMerge Ref (MergingRun TreeMergeType m h)
mr = MergingTreeState m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree (MergingTreeState m h -> m (Ref (MergingTree m h)))
-> (Ref (MergingRun TreeMergeType m h) -> MergingTreeState m h)
-> Ref (MergingRun TreeMergeType m h)
-> m (Ref (MergingTree m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (MergingRun TreeMergeType m h) -> MergingTreeState m h
forall (m :: * -> *) h.
Ref (MergingRun TreeMergeType m h) -> MergingTreeState m h
OngoingTreeMerge (Ref (MergingRun TreeMergeType m h) -> m (Ref (MergingTree m h)))
-> m (Ref (MergingRun TreeMergeType m h))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Ref (MergingRun TreeMergeType m h)
-> m (Ref (MergingRun TreeMergeType m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingRun TreeMergeType m h)
mr
{-# SPECIALISE newPendingLevelMerge ::
[PreExistingRun IO h]
-> Maybe (Ref (MergingTree IO h))
-> IO (Ref (MergingTree IO h)) #-}
newPendingLevelMerge ::
forall m h.
(MonadMVar m, MonadMask m, PrimMonad m)
=> [PreExistingRun m h]
-> Maybe (Ref (MergingTree m h))
-> m (Ref (MergingTree m h))
newPendingLevelMerge :: forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, PrimMonad m) =>
[PreExistingRun m h]
-> Maybe (Ref (MergingTree m h)) -> m (Ref (MergingTree m h))
newPendingLevelMerge [] (Just Ref (MergingTree m h)
t) = Ref (MergingTree m h) -> m (Ref (MergingTree m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingTree m h)
t
newPendingLevelMerge [PreExistingRun Ref (Run m h)
r] Maybe (Ref (MergingTree m h))
Nothing = do
Ref (Run m h)
r' <- Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r
MergingTreeState m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree (Ref (Run m h) -> MergingTreeState m h
forall (m :: * -> *) h. Ref (Run m h) -> MergingTreeState m h
CompletedTreeMerge Ref (Run m h)
r')
newPendingLevelMerge [PreExistingRun m h]
prs Maybe (Ref (MergingTree m h))
mmt = do
Maybe (Ref (MergingTree m h))
mmt' <- Maybe (Ref (MergingTree m h)) -> m (Maybe (Ref (MergingTree m h)))
dupMaybeMergingTree Maybe (Ref (MergingTree m h))
mmt
Vector (PreExistingRun m h)
prs' <- (PreExistingRun m h -> m (PreExistingRun m h))
-> Vector (PreExistingRun m h) -> m (Vector (PreExistingRun m h))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Vector a -> f (Vector b)
traverse PreExistingRun m h -> m (PreExistingRun m h)
forall {m :: * -> *} {h}.
(PrimMonad m, MonadThrow m) =>
PreExistingRun m h -> m (PreExistingRun m h)
dupPreExistingRun ([PreExistingRun m h] -> Vector (PreExistingRun m h)
forall a. [a] -> Vector a
V.fromList [PreExistingRun m h]
prs)
MergingTreeState m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree (PendingMerge m h -> MergingTreeState m h
forall (m :: * -> *) h. PendingMerge m h -> MergingTreeState m h
PendingTreeMerge (Vector (PreExistingRun m h)
-> Maybe (Ref (MergingTree m h)) -> PendingMerge m h
forall (m :: * -> *) h.
Vector (PreExistingRun m h)
-> Maybe (Ref (MergingTree m h)) -> PendingMerge m h
PendingLevelMerge Vector (PreExistingRun m h)
prs' Maybe (Ref (MergingTree m h))
mmt'))
where
dupPreExistingRun :: PreExistingRun m h -> m (PreExistingRun m h)
dupPreExistingRun (PreExistingRun Ref (Run m h)
r) =
Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun (Ref (Run m h) -> PreExistingRun m h)
-> m (Ref (Run m h)) -> m (PreExistingRun m h)
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r
dupPreExistingRun (PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr) =
Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h
forall (m :: * -> *) h.
Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h
PreExistingMergingRun (Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h)
-> m (Ref (MergingRun LevelMergeType m h))
-> m (PreExistingRun m h)
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> Ref (MergingRun LevelMergeType m h)
-> m (Ref (MergingRun LevelMergeType m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingRun LevelMergeType m h)
mr
dupMaybeMergingTree :: Maybe (Ref (MergingTree m h))
-> m (Maybe (Ref (MergingTree m h)))
dupMaybeMergingTree :: Maybe (Ref (MergingTree m h)) -> m (Maybe (Ref (MergingTree m h)))
dupMaybeMergingTree Maybe (Ref (MergingTree m h))
Nothing = Maybe (Ref (MergingTree m h)) -> m (Maybe (Ref (MergingTree m h)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Ref (MergingTree m h))
forall a. Maybe a
Nothing
dupMaybeMergingTree (Just Ref (MergingTree m h)
mt) = do
Bool
isempty <- Ref (MergingTree m h) -> m Bool
forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
isStructurallyEmpty Ref (MergingTree m h)
mt
if Bool
isempty
then Maybe (Ref (MergingTree m h)) -> m (Maybe (Ref (MergingTree m h)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Ref (MergingTree m h))
forall a. Maybe a
Nothing
else Ref (MergingTree m h) -> Maybe (Ref (MergingTree m h))
forall a. a -> Maybe a
Just (Ref (MergingTree m h) -> Maybe (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h)) -> m (Maybe (Ref (MergingTree m h)))
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> Ref (MergingTree m h) -> m (Ref (MergingTree m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (MergingTree m h)
mt
{-# SPECIALISE newPendingUnionMerge ::
[Ref (MergingTree IO h)]
-> IO (Ref (MergingTree IO h)) #-}
newPendingUnionMerge ::
(MonadMVar m, MonadMask m, PrimMonad m)
=> [Ref (MergingTree m h)]
-> m (Ref (MergingTree m h))
newPendingUnionMerge :: forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, PrimMonad m) =>
[Ref (MergingTree m h)] -> m (Ref (MergingTree m h))
newPendingUnionMerge [Ref (MergingTree m h)]
mts = do
Vector (Ref (MergingTree m h))
mts' <- (Ref (MergingTree m h) -> m Bool)
-> Vector (Ref (MergingTree m h))
-> m (Vector (Ref (MergingTree m h)))
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Vector a -> m (Vector a)
V.filterM ((Bool -> Bool) -> m Bool -> m Bool
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> Bool
not (m Bool -> m Bool)
-> (Ref (MergingTree m h) -> m Bool)
-> Ref (MergingTree m h)
-> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (MergingTree m h) -> m Bool
forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
isStructurallyEmpty) ([Ref (MergingTree m h)] -> Vector (Ref (MergingTree m h))
forall a. [a] -> Vector a
V.fromList [Ref (MergingTree m h)]
mts)
Vector (Ref (MergingTree m h))
mts'' <- (Ref (MergingTree m h) -> m (Ref (MergingTree m h)))
-> Vector (Ref (MergingTree m h))
-> m (Vector (Ref (MergingTree m h)))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM Ref (MergingTree m h) -> m (Ref (MergingTree m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Vector (Ref (MergingTree m h))
mts'
case Vector (Ref (MergingTree m h))
-> Maybe (Ref (MergingTree m h), Vector (Ref (MergingTree m h)))
forall a. Vector a -> Maybe (a, Vector a)
V.uncons Vector (Ref (MergingTree m h))
mts'' of
Just (Ref (MergingTree m h)
mt, Vector (Ref (MergingTree m h))
x) | Vector (Ref (MergingTree m h)) -> Bool
forall a. Vector a -> Bool
V.null Vector (Ref (MergingTree m h))
x
-> Ref (MergingTree m h) -> m (Ref (MergingTree m h))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Ref (MergingTree m h)
mt
Maybe (Ref (MergingTree m h), Vector (Ref (MergingTree m h)))
_ -> MergingTreeState m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree (PendingMerge m h -> MergingTreeState m h
forall (m :: * -> *) h. PendingMerge m h -> MergingTreeState m h
PendingTreeMerge (Vector (Ref (MergingTree m h)) -> PendingMerge m h
forall (m :: * -> *) h.
Vector (Ref (MergingTree m h)) -> PendingMerge m h
PendingUnionMerge Vector (Ref (MergingTree m h))
mts''))
{-# SPECIALISE isStructurallyEmpty :: Ref (MergingTree IO h) -> IO Bool #-}
isStructurallyEmpty :: MonadMVar m => Ref (MergingTree m h) -> m Bool
isStructurallyEmpty :: forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
isStructurallyEmpty (DeRef MergingTree {StrictMVar m (MergingTreeState m h)
mergeState :: forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeState :: StrictMVar m (MergingTreeState m h)
mergeState}) =
MergingTreeState m h -> Bool
forall (m :: * -> *) h. MergingTreeState m h -> Bool
isStructurallyEmptyState (MergingTreeState m h -> Bool)
-> m (MergingTreeState m h) -> m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictMVar m (MergingTreeState m h) -> m (MergingTreeState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar StrictMVar m (MergingTreeState m h)
mergeState
isStructurallyEmptyState :: MergingTreeState m h -> Bool
isStructurallyEmptyState :: forall (m :: * -> *) h. MergingTreeState m h -> Bool
isStructurallyEmptyState = \case
PendingTreeMerge (PendingLevelMerge Vector (PreExistingRun m h)
prs Maybe (Ref (MergingTree m h))
Nothing) -> Vector (PreExistingRun m h) -> Bool
forall a. Vector a -> Bool
V.null Vector (PreExistingRun m h)
prs
PendingTreeMerge (PendingUnionMerge Vector (Ref (MergingTree m h))
mts) -> Vector (Ref (MergingTree m h)) -> Bool
forall a. Vector a -> Bool
V.null Vector (Ref (MergingTree m h))
mts
MergingTreeState m h
_ -> Bool
False
{-# SPECIALISE mkMergingTree ::
MergingTreeState IO h
-> IO (Ref (MergingTree IO h)) #-}
mkMergingTree ::
(MonadMVar m, PrimMonad m, MonadMask m)
=> MergingTreeState m h
-> m (Ref (MergingTree m h))
mkMergingTree :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m (Ref (MergingTree m h))
mkMergingTree MergingTreeState m h
mergeTreeState = do
StrictMVar m (MergingTreeState m h)
mergeState <- MergingTreeState m h -> m (StrictMVar m (MergingTreeState m h))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar MergingTreeState m h
mergeTreeState
m ()
-> (RefCounter m -> MergingTree m h) -> m (Ref (MergingTree m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, HasCallStack) =>
m () -> (RefCounter m -> obj) -> m (Ref obj)
newRef (StrictMVar m (MergingTreeState m h) -> m ()
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
StrictMVar m (MergingTreeState m h) -> m ()
finalise StrictMVar m (MergingTreeState m h)
mergeState) ((RefCounter m -> MergingTree m h) -> m (Ref (MergingTree m h)))
-> (RefCounter m -> MergingTree m h) -> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \RefCounter m
mergeRefCounter ->
MergingTree {
StrictMVar m (MergingTreeState m h)
mergeState :: StrictMVar m (MergingTreeState m h)
mergeState :: StrictMVar m (MergingTreeState m h)
mergeState
, RefCounter m
mergeRefCounter :: RefCounter m
mergeRefCounter :: RefCounter m
mergeRefCounter
}
{-# SPECIALISE finalise :: StrictMVar IO (MergingTreeState IO h) -> IO () #-}
finalise :: (MonadMVar m, PrimMonad m, MonadMask m)
=> StrictMVar m (MergingTreeState m h) -> m ()
finalise :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m, MonadMask m) =>
StrictMVar m (MergingTreeState m h) -> m ()
finalise StrictMVar m (MergingTreeState m h)
mergeState = MergingTreeState m h -> m ()
forall {m :: * -> *} {h}.
(PrimMonad m, MonadMask m) =>
MergingTreeState m h -> m ()
releaseMTS (MergingTreeState m h -> m ()) -> m (MergingTreeState m h) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< StrictMVar m (MergingTreeState m h) -> m (MergingTreeState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar StrictMVar m (MergingTreeState m h)
mergeState
where
releaseMTS :: MergingTreeState m h -> m ()
releaseMTS (CompletedTreeMerge Ref (Run m h)
r) = Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r
releaseMTS (OngoingTreeMerge Ref (MergingRun TreeMergeType m h)
mr) = Ref (MergingRun TreeMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun TreeMergeType m h)
mr
releaseMTS (PendingTreeMerge PendingMerge m h
ptm) =
case PendingMerge m h
ptm of
PendingUnionMerge Vector (Ref (MergingTree m h))
mts -> (Ref (MergingTree m h) -> m ())
-> Vector (Ref (MergingTree m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Vector (Ref (MergingTree m h))
mts
PendingLevelMerge Vector (PreExistingRun m h)
prs Maybe (Ref (MergingTree m h))
mmt -> (PreExistingRun m h -> m ()) -> Vector (PreExistingRun m h) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ PreExistingRun m h -> m ()
forall {m :: * -> *} {h}.
(PrimMonad m, MonadMask m) =>
PreExistingRun m h -> m ()
releasePER Vector (PreExistingRun m h)
prs
m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Ref (MergingTree m h) -> m ())
-> Maybe (Ref (MergingTree m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Maybe (Ref (MergingTree m h))
mmt
releasePER :: PreExistingRun m h -> m ()
releasePER (PreExistingRun Ref (Run m h)
r) = Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r
releasePER (PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr) = Ref (MergingRun LevelMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun LevelMergeType m h)
mr
{-# SPECIALISE remainingMergeDebt ::
Ref (MergingTree IO h) -> IO (MergeDebt, NumEntries) #-}
remainingMergeDebt ::
(MonadMVar m, PrimMonad m)
=> Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt (DeRef MergingTree m h
mt) = do
StrictMVar m (MergingTreeState m h) -> m (MergingTreeState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
readMVar (MergingTree m h -> StrictMVar m (MergingTreeState m h)
forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeState MergingTree m h
mt) m (MergingTreeState m h)
-> (MergingTreeState m h -> m (MergeDebt, NumEntries))
-> m (MergeDebt, NumEntries)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
CompletedTreeMerge Ref (Run m h)
r -> (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergeCredits -> MergeDebt
MergeDebt MergeCredits
0, Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r)
OngoingTreeMerge Ref (MergingRun TreeMergeType m h)
mr -> (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
forall {b}. (MergeDebt, b) -> (MergeDebt, b)
addDebtOne ((MergeDebt, NumEntries) -> (MergeDebt, NumEntries))
-> m (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ref (MergingRun TreeMergeType m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) t h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingRun t m h) -> m (MergeDebt, NumEntries)
MR.remainingMergeDebt Ref (MergingRun TreeMergeType m h)
mr
PendingTreeMerge PendingMerge m h
ptm -> (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
forall {b}. (MergeDebt, b) -> (MergeDebt, b)
addDebtOne ((MergeDebt, NumEntries) -> (MergeDebt, NumEntries))
-> m (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> PendingMerge m h -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
PendingMerge m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPendingMerge PendingMerge m h
ptm
where
addDebtOne :: (MergeDebt, b) -> (MergeDebt, b)
addDebtOne (MergeDebt !MergeCredits
debt, !b
size) = (MergeCredits -> MergeDebt
MergeDebt (MergeCredits
debt MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
1), b
size)
{-# SPECIALISE remainingMergeDebtPendingMerge ::
PendingMerge IO h -> IO (MergeDebt, NumEntries) #-}
remainingMergeDebtPendingMerge ::
(MonadMVar m, PrimMonad m)
=> PendingMerge m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPendingMerge :: forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
PendingMerge m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPendingMerge (PendingMerge TreeMergeType
_ Vector (PreExistingRun m h)
prs Vector (Ref (MergingTree m h))
mts) = do
Vector (MergeDebt, NumEntries)
debtsPre <- (PreExistingRun m h -> m (MergeDebt, NumEntries))
-> Vector (PreExistingRun m h)
-> m (Vector (MergeDebt, NumEntries))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Vector a -> f (Vector b)
traverse PreExistingRun m h -> m (MergeDebt, NumEntries)
forall {h}. PreExistingRun m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPreExistingRun Vector (PreExistingRun m h)
prs
Vector (MergeDebt, NumEntries)
debtsChild <- (Ref (MergingTree m h) -> m (MergeDebt, NumEntries))
-> Vector (Ref (MergingTree m h))
-> m (Vector (MergeDebt, NumEntries))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Vector a -> f (Vector b)
traverse Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt Vector (Ref (MergingTree m h))
mts
(MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Vector (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
debtOfNestedMerge (Vector (MergeDebt, NumEntries)
debtsPre Vector (MergeDebt, NumEntries)
-> Vector (MergeDebt, NumEntries) -> Vector (MergeDebt, NumEntries)
forall a. Semigroup a => a -> a -> a
<> Vector (MergeDebt, NumEntries)
debtsChild))
where
remainingMergeDebtPreExistingRun :: PreExistingRun m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPreExistingRun = \case
PreExistingRun Ref (Run m h)
r -> (MergeDebt, NumEntries) -> m (MergeDebt, NumEntries)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergeCredits -> MergeDebt
MergeDebt MergeCredits
0, Ref (Run m h) -> NumEntries
forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
Run.size Ref (Run m h)
r)
PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr -> Ref (MergingRun LevelMergeType m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) t h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingRun t m h) -> m (MergeDebt, NumEntries)
MR.remainingMergeDebt Ref (MergingRun LevelMergeType m h)
mr
debtOfNestedMerge :: Vector (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
debtOfNestedMerge :: Vector (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
debtOfNestedMerge Vector (MergeDebt, NumEntries)
debts =
(MergeCredits -> MergeDebt
MergeDebt (MergeCredits
c MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ Int -> MergeCredits
MR.MergeCredits Int
n), Int -> NumEntries
NumEntries Int
n)
where
(MergeDebt MergeCredits
c, NumEntries Int
n) = ((MergeDebt, NumEntries)
-> (MergeDebt, NumEntries) -> (MergeDebt, NumEntries))
-> (MergeDebt, NumEntries)
-> Vector (MergeDebt, NumEntries)
-> (MergeDebt, NumEntries)
forall b a. (b -> a -> b) -> b -> Vector a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (MergeDebt, NumEntries)
-> (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
add (MergeCredits -> MergeDebt
MergeDebt MergeCredits
0, Int -> NumEntries
NumEntries Int
0) Vector (MergeDebt, NumEntries)
debts
add :: (MergeDebt, NumEntries)
-> (MergeDebt, NumEntries) -> (MergeDebt, NumEntries)
add (MergeDebt !MergeCredits
d1, NumEntries !Int
n1) (MergeDebt !MergeCredits
d2, NumEntries !Int
n2) =
(MergeCredits -> MergeDebt
MergeDebt (MergeCredits
d1 MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
d2), Int -> NumEntries
NumEntries (Int
n1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
n2))
{-# SPECIALISE supplyCredits ::
HasFS IO h
-> HasBlockIO IO h
-> ResolveSerialisedValue
-> Run.RunParams
-> MR.CreditThreshold
-> SessionRoot
-> UniqCounter IO
-> Ref (MergingTree IO h)
-> MR.MergeCredits
-> IO MR.MergeCredits #-}
supplyCredits ::
forall m h.
(MonadMVar m, MonadST m, MonadSTM m, MonadMask m)
=> HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> Run.RunParams
-> MR.CreditThreshold
-> SessionRoot
-> UniqCounter m
-> Ref (MergingTree m h)
-> MR.MergeCredits
-> m MR.MergeCredits
supplyCredits :: forall (m :: * -> *) h.
(MonadMVar m, MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> CreditThreshold
-> SessionRoot
-> UniqCounter m
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
supplyCredits HasFS m h
hfs HasBlockIO m h
hbio ResolveSerialisedValue
resolve RunParams
runParams CreditThreshold
threshold SessionRoot
root UniqCounter m
uc = \Ref (MergingTree m h)
mt0 MergeCredits
c0 -> do
if MergeCredits
c0 MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
0
then MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MergeCredits
0
else Ref (MergingTree m h) -> MergeCredits -> m MergeCredits
supplyTree Ref (MergingTree m h)
mt0 MergeCredits
c0
where
mkFreshRunPaths :: m RunFsPaths
mkFreshRunPaths = do
RunNumber
runNumber <- Unique -> RunNumber
uniqueToRunNumber (Unique -> RunNumber) -> m Unique -> m RunNumber
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter UniqCounter m
uc
RunFsPaths -> m RunFsPaths
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ActiveDir -> RunNumber -> RunFsPaths
Paths.runPath (SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root) RunNumber
runNumber)
supplyTree :: Ref (MergingTree m h) -> MergeCredits -> m MergeCredits
supplyTree =
(Ref (MergingTree m h) -> m (MergeDebt, NumEntries))
-> (Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
forall (m :: * -> *) r s.
(HasCallStack, Monad m) =>
(r -> m (MergeDebt, s))
-> (r -> MergeCredits -> m MergeCredits)
-> r
-> MergeCredits
-> m MergeCredits
MR.supplyChecked Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
remainingMergeDebt ((Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> (Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
forall a b. (a -> b) -> a -> b
$ \(DeRef MergingTree m h
mt) MergeCredits
credits ->
m (MergingTreeState m h)
-> (MergingTreeState m h -> m ())
-> (ActionRegistry m
-> MergingTreeState m h -> m (MergingTreeState m h, MergeCredits))
-> m MergeCredits
forall (m :: * -> *) st a.
(PrimMonad m, MonadCatch m) =>
m st
-> (st -> m ()) -> (ActionRegistry m -> st -> m (st, a)) -> m a
modifyWithActionRegistry
(StrictMVar m (MergingTreeState m h) -> m (MergingTreeState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
takeMVar (MergingTree m h -> StrictMVar m (MergingTreeState m h)
forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeState MergingTree m h
mt))
(StrictMVar m (MergingTreeState m h) -> MergingTreeState m h -> m ()
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m ()
putMVar (MergingTree m h -> StrictMVar m (MergingTreeState m h)
forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeState MergingTree m h
mt))
(\ActionRegistry m
reg MergingTreeState m h
state -> ActionRegistry m
-> MergingTreeState m h
-> MergeCredits
-> m (MergingTreeState m h, MergeCredits)
supplyState ActionRegistry m
reg MergingTreeState m h
state MergeCredits
credits)
supplyState :: ActionRegistry m
-> MergingTreeState m h
-> MergeCredits
-> m (MergingTreeState m h, MergeCredits)
supplyState ActionRegistry m
reg MergingTreeState m h
state MergeCredits
credits =
case MergingTreeState m h
state of
CompletedTreeMerge Ref (Run m h)
_ ->
(MergingTreeState m h, MergeCredits)
-> m (MergingTreeState m h, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergingTreeState m h
state, MergeCredits
credits)
OngoingTreeMerge Ref (MergingRun TreeMergeType m h)
mr -> do
MergeCredits
leftovers <- Ref (MergingRun TreeMergeType m h)
-> CreditThreshold -> MergeCredits -> m MergeCredits
forall t (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h)
-> CreditThreshold -> MergeCredits -> m MergeCredits
MR.supplyCreditsRelative Ref (MergingRun TreeMergeType m h)
mr CreditThreshold
threshold MergeCredits
credits
if MergeCredits
leftovers MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
0
then
(MergingTreeState m h, MergeCredits)
-> m (MergingTreeState m h, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergingTreeState m h
state, MergeCredits
0)
else do
Ref (Run m h)
r <- ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingRun TreeMergeType m h) -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Ref (Run m h))
MR.expectCompleted Ref (MergingRun TreeMergeType m h)
mr) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingRun TreeMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun TreeMergeType m h)
mr)
(MergingTreeState m h, MergeCredits)
-> m (MergingTreeState m h, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Ref (Run m h) -> MergingTreeState m h
forall (m :: * -> *) h. Ref (Run m h) -> MergingTreeState m h
CompletedTreeMerge Ref (Run m h)
r, MergeCredits
leftovers)
PendingTreeMerge PendingMerge m h
_
| MergingTreeState m h -> Bool
forall (m :: * -> *) h. MergingTreeState m h -> Bool
isStructurallyEmptyState MergingTreeState m h
state -> do
RunFsPaths
runPaths <- m RunFsPaths
mkFreshRunPaths
Ref (Run m h)
run <-
ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(HasFS m h
-> HasBlockIO m h -> RunParams -> RunFsPaths -> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h -> RunParams -> RunFsPaths -> m (Ref (Run m h))
Run.newEmpty HasFS m h
hfs HasBlockIO m h
hbio RunParams
runParams RunFsPaths
runPaths)
Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
(MergingTreeState m h, MergeCredits)
-> m (MergingTreeState m h, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Ref (Run m h) -> MergingTreeState m h
forall (m :: * -> *) h. Ref (Run m h) -> MergingTreeState m h
CompletedTreeMerge Ref (Run m h)
run, MergeCredits
credits)
PendingTreeMerge PendingMerge m h
pm -> do
MergeCredits
leftovers <- PendingMerge m h -> MergeCredits -> m MergeCredits
supplyPending PendingMerge m h
pm MergeCredits
credits
if MergeCredits
leftovers MergeCredits -> MergeCredits -> Bool
forall a. Ord a => a -> a -> Bool
<= MergeCredits
0
then
(MergingTreeState m h, MergeCredits)
-> m (MergingTreeState m h, MergeCredits)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergingTreeState m h
state, MergeCredits
leftovers)
else do
MergingTreeState m h
state' <- ActionRegistry m -> PendingMerge m h -> m (MergingTreeState m h)
startPendingMerge ActionRegistry m
reg PendingMerge m h
pm
ActionRegistry m
-> MergingTreeState m h
-> MergeCredits
-> m (MergingTreeState m h, MergeCredits)
supplyState ActionRegistry m
reg MergingTreeState m h
state' MergeCredits
leftovers
supplyPending ::
PendingMerge m h -> MR.MergeCredits -> m MR.MergeCredits
supplyPending :: PendingMerge m h -> MergeCredits -> m MergeCredits
supplyPending =
(PendingMerge m h -> m (MergeDebt, NumEntries))
-> (PendingMerge m h -> MergeCredits -> m MergeCredits)
-> PendingMerge m h
-> MergeCredits
-> m MergeCredits
forall (m :: * -> *) r s.
(HasCallStack, Monad m) =>
(r -> m (MergeDebt, s))
-> (r -> MergeCredits -> m MergeCredits)
-> r
-> MergeCredits
-> m MergeCredits
MR.supplyChecked PendingMerge m h -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
PendingMerge m h -> m (MergeDebt, NumEntries)
remainingMergeDebtPendingMerge ((PendingMerge m h -> MergeCredits -> m MergeCredits)
-> PendingMerge m h -> MergeCredits -> m MergeCredits)
-> (PendingMerge m h -> MergeCredits -> m MergeCredits)
-> PendingMerge m h
-> MergeCredits
-> m MergeCredits
forall a b. (a -> b) -> a -> b
$ \PendingMerge m h
pm MergeCredits
credits -> do
case PendingMerge m h
pm of
PendingLevelMerge Vector (PreExistingRun m h)
prs Maybe (Ref (MergingTree m h))
mt ->
(MergeCredits -> PreExistingRun m h -> m MergeCredits)
-> [PreExistingRun m h] -> MergeCredits -> m MergeCredits
forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
leftToRight MergeCredits -> PreExistingRun m h -> m MergeCredits
supplyPreExisting (Vector (PreExistingRun m h) -> [PreExistingRun m h]
forall a. Vector a -> [a]
V.toList Vector (PreExistingRun m h)
prs) MergeCredits
credits
m MergeCredits
-> (MergeCredits -> m MergeCredits) -> m MergeCredits
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MergeCredits -> Ref (MergingTree m h) -> m MergeCredits)
-> [Ref (MergingTree m h)] -> MergeCredits -> m MergeCredits
forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
leftToRight ((Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> MergeCredits -> Ref (MergingTree m h) -> m MergeCredits
forall a b c. (a -> b -> c) -> b -> a -> c
flip Ref (MergingTree m h) -> MergeCredits -> m MergeCredits
supplyTree) (Maybe (Ref (MergingTree m h)) -> [Ref (MergingTree m h)]
forall a. Maybe a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Maybe (Ref (MergingTree m h))
mt)
PendingUnionMerge Vector (Ref (MergingTree m h))
mts ->
(MergeCredits -> Ref (MergingTree m h) -> m MergeCredits)
-> [Ref (MergingTree m h)] -> MergeCredits -> m MergeCredits
forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
splitEqually ((Ref (MergingTree m h) -> MergeCredits -> m MergeCredits)
-> MergeCredits -> Ref (MergingTree m h) -> m MergeCredits
forall a b c. (a -> b -> c) -> b -> a -> c
flip Ref (MergingTree m h) -> MergeCredits -> m MergeCredits
supplyTree) (Vector (Ref (MergingTree m h)) -> [Ref (MergingTree m h)]
forall a. Vector a -> [a]
V.toList Vector (Ref (MergingTree m h))
mts) MergeCredits
credits
supplyPreExisting :: MergeCredits -> PreExistingRun m h -> m MergeCredits
supplyPreExisting MergeCredits
c = \case
PreExistingRun Ref (Run m h)
_r -> MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MergeCredits
c
PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr -> Ref (MergingRun LevelMergeType m h)
-> CreditThreshold -> MergeCredits -> m MergeCredits
forall t (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMVar m, MonadMask m) =>
Ref (MergingRun t m h)
-> CreditThreshold -> MergeCredits -> m MergeCredits
MR.supplyCreditsRelative Ref (MergingRun LevelMergeType m h)
mr CreditThreshold
threshold MergeCredits
c
leftToRight ::
(MR.MergeCredits -> a -> m MR.MergeCredits)
-> [a] -> MR.MergeCredits -> m MR.MergeCredits
leftToRight :: forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
leftToRight MergeCredits -> a -> m MergeCredits
_ [a]
_ MergeCredits
0 = MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MergeCredits
0
leftToRight MergeCredits -> a -> m MergeCredits
_ [] MergeCredits
c = MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MergeCredits
c
leftToRight MergeCredits -> a -> m MergeCredits
f (a
x:[a]
xs) MergeCredits
c = MergeCredits -> a -> m MergeCredits
f MergeCredits
c a
x m MergeCredits
-> (MergeCredits -> m MergeCredits) -> m MergeCredits
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
leftToRight MergeCredits -> a -> m MergeCredits
f [a]
xs
splitEqually ::
(MR.MergeCredits -> a -> m MR.MergeCredits)
-> [a] -> MR.MergeCredits -> m MR.MergeCredits
splitEqually :: forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
splitEqually MergeCredits -> a -> m MergeCredits
f [a]
xs (MR.MergeCredits Int
credits) =
(MergeCredits -> a -> m MergeCredits)
-> MergeCredits -> [a] -> m MergeCredits
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM MergeCredits -> a -> m MergeCredits
supplyNth (Int -> MergeCredits
MR.MergeCredits Int
credits) [a]
xs m MergeCredits
-> (MergeCredits -> m MergeCredits) -> m MergeCredits
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
forall a.
(MergeCredits -> a -> m MergeCredits)
-> [a] -> MergeCredits -> m MergeCredits
leftToRight MergeCredits -> a -> m MergeCredits
f [a]
xs
where
!n :: Int
n = [a] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
xs
!k :: MergeCredits
k = Int -> MergeCredits
MR.MergeCredits ((Int
credits Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
n)
supplyNth :: MergeCredits -> a -> m MergeCredits
supplyNth MergeCredits
0 a
_ = MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return MergeCredits
0
supplyNth MergeCredits
c a
t = do
let creditsToSpend :: MergeCredits
creditsToSpend = MergeCredits -> MergeCredits -> MergeCredits
forall a. Ord a => a -> a -> a
min MergeCredits
k MergeCredits
c
MergeCredits
leftovers <- MergeCredits -> a -> m MergeCredits
f MergeCredits
creditsToSpend a
t
MergeCredits -> m MergeCredits
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MergeCredits
c MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
- MergeCredits
creditsToSpend MergeCredits -> MergeCredits -> MergeCredits
forall a. Num a => a -> a -> a
+ MergeCredits
leftovers)
startPendingMerge :: ActionRegistry m -> PendingMerge m h -> m (MergingTreeState m h)
startPendingMerge ActionRegistry m
reg PendingMerge m h
pm = do
(TreeMergeType
mergeType, Vector (Ref (Run m h))
rs) <- ActionRegistry m
-> PendingMerge m h -> m (TreeMergeType, Vector (Ref (Run m h)))
expectCompletedChildren ActionRegistry m
reg PendingMerge m h
pm
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Int
forall a. Vector a -> Int
V.length Vector (Ref (Run m h))
rs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
RunFsPaths
runPaths <- m RunFsPaths
mkFreshRunPaths
Ref (MergingRun TreeMergeType m h)
mr <-
ActionRegistry m
-> m (Ref (MergingRun TreeMergeType m h))
-> (Ref (MergingRun TreeMergeType m h) -> m ())
-> m (Ref (MergingRun TreeMergeType m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> TreeMergeType
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Ref (MergingRun TreeMergeType m h))
forall t (m :: * -> *) h.
(IsMergeType t, MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> t
-> RunFsPaths
-> Vector (Ref (Run m h))
-> m (Ref (MergingRun t m h))
MR.new HasFS m h
hfs HasBlockIO m h
hbio ResolveSerialisedValue
resolve RunParams
runParams TreeMergeType
mergeType RunFsPaths
runPaths Vector (Ref (Run m h))
rs)
Ref (MergingRun TreeMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
(Ref (Run m h) -> m ()) -> Vector (Ref (Run m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (\Ref (Run m h)
r -> ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r)) Vector (Ref (Run m h))
rs
MergingTreeState m h -> m (MergingTreeState m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Ref (MergingRun TreeMergeType m h) -> MergingTreeState m h
forall (m :: * -> *) h.
Ref (MergingRun TreeMergeType m h) -> MergingTreeState m h
OngoingTreeMerge Ref (MergingRun TreeMergeType m h)
mr)
expectCompletedChildren ::
ActionRegistry m
-> PendingMerge m h
-> m (MR.TreeMergeType, Vector (Ref (Run m h)))
expectCompletedChildren :: ActionRegistry m
-> PendingMerge m h -> m (TreeMergeType, Vector (Ref (Run m h)))
expectCompletedChildren ActionRegistry m
reg (PendingMerge TreeMergeType
ty Vector (PreExistingRun m h)
prs Vector (Ref (MergingTree m h))
mts) = do
Vector (Ref (Run m h))
rs1 <- Vector (PreExistingRun m h)
-> (PreExistingRun m h -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
V.forM Vector (PreExistingRun m h)
prs ((PreExistingRun m h -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h))))
-> (PreExistingRun m h -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \case
PreExistingRun Ref (Run m h)
r -> do
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (Run m h)
r)
ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr -> do
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingRun LevelMergeType m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingRun LevelMergeType m h)
mr)
ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingRun LevelMergeType m h) -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Ref (Run m h))
MR.expectCompleted Ref (MergingRun LevelMergeType m h)
mr) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
Vector (Ref (Run m h))
rs2 <- Vector (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
V.forM Vector (Ref (MergingTree m h))
mts ((Ref (MergingTree m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h))))
-> (Ref (MergingTree m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \Ref (MergingTree m h)
mt -> do
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingTree m h)
mt)
ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (MergingTree m h) -> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingTree m h) -> m (Ref (Run m h))
expectCompleted Ref (MergingTree m h)
mt) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
(TreeMergeType, Vector (Ref (Run m h)))
-> m (TreeMergeType, Vector (Ref (Run m h)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TreeMergeType
ty, Vector (Ref (Run m h))
rs1 Vector (Ref (Run m h))
-> Vector (Ref (Run m h)) -> Vector (Ref (Run m h))
forall a. Semigroup a => a -> a -> a
<> Vector (Ref (Run m h))
rs2)
expectCompleted ::
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m)
=> Ref (MergingTree m h) -> m (Ref (Run m h))
expectCompleted :: forall (m :: * -> *) h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingTree m h) -> m (Ref (Run m h))
expectCompleted (DeRef MergingTree {StrictMVar m (MergingTreeState m h)
RefCounter m
mergeState :: forall (m :: * -> *) h.
MergingTree m h -> StrictMVar m (MergingTreeState m h)
mergeRefCounter :: forall (m :: * -> *) h. MergingTree m h -> RefCounter m
mergeState :: StrictMVar m (MergingTreeState m h)
mergeRefCounter :: RefCounter m
..}) = do
StrictMVar m (MergingTreeState m h)
-> (MergingTreeState m h -> m (Ref (Run m h))) -> m (Ref (Run m h))
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m b) -> m b
withMVar StrictMVar m (MergingTreeState m h)
mergeState ((MergingTreeState m h -> m (Ref (Run m h))) -> m (Ref (Run m h)))
-> (MergingTreeState m h -> m (Ref (Run m h))) -> m (Ref (Run m h))
forall a b. (a -> b) -> a -> b
$ \case
CompletedTreeMerge Ref (Run m h)
r -> Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r
OngoingTreeMerge Ref (MergingRun TreeMergeType m h)
mr -> Ref (MergingRun TreeMergeType m h) -> m (Ref (Run m h))
forall (m :: * -> *) t h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
Ref (MergingRun t m h) -> m (Ref (Run m h))
MR.expectCompleted Ref (MergingRun TreeMergeType m h)
mr
PendingTreeMerge{} ->
[Char] -> m (Ref (Run m h))
forall a. HasCallStack => [Char] -> a
error [Char]
"expectCompleted: expected a completed merging tree, but found a pending one"