{-# OPTIONS_HADDOCK not-home #-}
module Database.LSMTree.Internal.Readers (
Readers (..)
, OffsetKey (..)
, ReaderSource (..)
, ReadersMergeType (..)
, new
, close
, peekKey
, HasMore (..)
, pop
, dropWhileKey
, Reader (..)
, ReaderNumber (..)
, ReadCtx (..)
) where
import Control.Monad (zipWithM)
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadSTM (MonadSTM (..))
import Control.Monad.Class.MonadThrow (MonadMask)
import Control.Monad.Primitive
import Control.RefCount
import Data.Function (on)
import Data.Functor ((<&>))
import Data.List.NonEmpty (nonEmpty)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes)
import Data.Primitive.MutVar
import Data.Traversable (for)
import Database.LSMTree.Internal.BlobRef (BlobSpan, RawBlobRef)
import Database.LSMTree.Internal.Entry (Entry (..))
import qualified Database.LSMTree.Internal.Entry as Entry
import Database.LSMTree.Internal.Index.CompactAcc (SMaybe (..),
smaybe)
import Database.LSMTree.Internal.Run (Run)
import Database.LSMTree.Internal.RunReader (OffsetKey (..),
RunReader (..))
import qualified Database.LSMTree.Internal.RunReader as RunReader
import Database.LSMTree.Internal.Serialise
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WB
import qualified KMerge.Heap as Heap
import qualified System.FS.API as FS
data Readers m h = Readers {
forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersHeap :: !(Heap.MutableHeap (PrimState m) (ReadCtx m h))
, forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersNext :: !(MutVar (PrimState m) (ReadCtx m h))
}
newtype ReaderNumber = ReaderNumber Int
deriving stock (ReaderNumber -> ReaderNumber -> Bool
(ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool) -> Eq ReaderNumber
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReaderNumber -> ReaderNumber -> Bool
== :: ReaderNumber -> ReaderNumber -> Bool
$c/= :: ReaderNumber -> ReaderNumber -> Bool
/= :: ReaderNumber -> ReaderNumber -> Bool
Eq, Eq ReaderNumber
Eq ReaderNumber =>
(ReaderNumber -> ReaderNumber -> Ordering)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> ReaderNumber)
-> (ReaderNumber -> ReaderNumber -> ReaderNumber)
-> Ord ReaderNumber
ReaderNumber -> ReaderNumber -> Bool
ReaderNumber -> ReaderNumber -> Ordering
ReaderNumber -> ReaderNumber -> ReaderNumber
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ReaderNumber -> ReaderNumber -> Ordering
compare :: ReaderNumber -> ReaderNumber -> Ordering
$c< :: ReaderNumber -> ReaderNumber -> Bool
< :: ReaderNumber -> ReaderNumber -> Bool
$c<= :: ReaderNumber -> ReaderNumber -> Bool
<= :: ReaderNumber -> ReaderNumber -> Bool
$c> :: ReaderNumber -> ReaderNumber -> Bool
> :: ReaderNumber -> ReaderNumber -> Bool
$c>= :: ReaderNumber -> ReaderNumber -> Bool
>= :: ReaderNumber -> ReaderNumber -> Bool
$cmax :: ReaderNumber -> ReaderNumber -> ReaderNumber
max :: ReaderNumber -> ReaderNumber -> ReaderNumber
$cmin :: ReaderNumber -> ReaderNumber -> ReaderNumber
min :: ReaderNumber -> ReaderNumber -> ReaderNumber
Ord)
data ReadCtx m h = ReadCtx {
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey :: !SerialisedKey
, forall (m :: * -> *) h. ReadCtx m h -> Entry m h
readCtxHeadEntry :: !(RunReader.Entry m h)
, forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber :: !ReaderNumber
, forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: !(Reader m h)
}
instance Eq (ReadCtx m h) where
== :: ReadCtx m h -> ReadCtx m h -> Bool
(==) = (SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Bool
forall a. Eq a => a -> a -> Bool
(==) ((SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Bool)
-> (ReadCtx m h -> (SerialisedKey, ReaderNumber))
-> ReadCtx m h
-> ReadCtx m h
-> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (\ReadCtx m h
r -> (ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
r, ReadCtx m h -> ReaderNumber
forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber ReadCtx m h
r))
instance Ord (ReadCtx m h) where
compare :: ReadCtx m h -> ReadCtx m h -> Ordering
compare = (SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Ordering
forall a. Ord a => a -> a -> Ordering
compare ((SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Ordering)
-> (ReadCtx m h -> (SerialisedKey, ReaderNumber))
-> ReadCtx m h
-> ReadCtx m h
-> Ordering
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (\ReadCtx m h
r -> (ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
r, ReadCtx m h -> ReaderNumber
forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber ReadCtx m h
r))
data Reader m h =
ReadBuffer !(MutVar (PrimState m) [KOp m h])
| ReadRun !(RunReader m h)
| ReadReaders !ReadersMergeType !(SMaybe (Readers m h))
type KOp m h = (SerialisedKey, Entry SerialisedValue (RawBlobRef m h))
data ReaderSource m h =
FromWriteBuffer !WB.WriteBuffer !(Ref (WB.WriteBufferBlobs m h))
| FromRun !(Ref (Run m h))
| FromReaders !ReadersMergeType ![ReaderSource m h]
{-# SPECIALISE new ::
ResolveSerialisedValue
-> OffsetKey
-> [ReaderSource IO h]
-> IO (Maybe (Readers IO h)) #-}
new :: forall m h.
(MonadMask m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> OffsetKey
-> [ReaderSource m h]
-> m (Maybe (Readers m h))
new :: forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> OffsetKey -> [ReaderSource m h] -> m (Maybe (Readers m h))
new ResolveSerialisedValue
resolve !OffsetKey
offsetKey [ReaderSource m h]
sources = do
[Maybe (ReadCtx m h)]
readers <- (Int -> ReaderSource m h -> m (Maybe (ReadCtx m h)))
-> [Int] -> [ReaderSource m h] -> m [Maybe (ReadCtx m h)]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM (ReaderNumber -> ReaderSource m h -> m (Maybe (ReadCtx m h))
fromSource (ReaderNumber -> ReaderSource m h -> m (Maybe (ReadCtx m h)))
-> (Int -> ReaderNumber)
-> Int
-> ReaderSource m h
-> m (Maybe (ReadCtx m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ReaderNumber
ReaderNumber) [Int
1..] [ReaderSource m h]
sources
Maybe (NonEmpty (ReadCtx m h))
-> (NonEmpty (ReadCtx m h) -> m (Readers m h))
-> m (Maybe (Readers m h))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for ([ReadCtx m h] -> Maybe (NonEmpty (ReadCtx m h))
forall a. [a] -> Maybe (NonEmpty a)
nonEmpty ([Maybe (ReadCtx m h)] -> [ReadCtx m h]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (ReadCtx m h)]
readers)) ((NonEmpty (ReadCtx m h) -> m (Readers m h))
-> m (Maybe (Readers m h)))
-> (NonEmpty (ReadCtx m h) -> m (Readers m h))
-> m (Maybe (Readers m h))
forall a b. (a -> b) -> a -> b
$ \NonEmpty (ReadCtx m h)
xs -> do
(MutableHeap (PrimState m) (ReadCtx m h)
readersHeap, ReadCtx m h
readCtx) <- NonEmpty (ReadCtx m h)
-> m (MutableHeap (PrimState m) (ReadCtx m h), ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
NonEmpty a -> m (MutableHeap (PrimState m) a, a)
Heap.newMutableHeap NonEmpty (ReadCtx m h)
xs
MutVar (PrimState m) (ReadCtx m h)
readersNext <- ReadCtx m h -> m (MutVar (PrimState m) (ReadCtx m h))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar ReadCtx m h
readCtx
Readers m h -> m (Readers m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..}
where
fromSource :: ReaderNumber -> ReaderSource m h -> m (Maybe (ReadCtx m h))
fromSource :: ReaderNumber -> ReaderSource m h -> m (Maybe (ReadCtx m h))
fromSource ReaderNumber
n ReaderSource m h
src =
case ReaderSource m h
src of
FromWriteBuffer WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbblobs -> do
Reader m h
rs <- WriteBuffer -> Ref (WriteBufferBlobs m h) -> m (Reader m h)
fromWB WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbblobs
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
n Reader m h
rs
FromRun Ref (Run m h)
r -> do
Reader m h
rs <- RunReader m h -> Reader m h
forall (m :: * -> *) h. RunReader m h -> Reader m h
ReadRun (RunReader m h -> Reader m h)
-> m (RunReader m h) -> m (Reader m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> OffsetKey -> Ref (Run m h) -> m (RunReader m h)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
OffsetKey -> Ref (Run m h) -> m (RunReader m h)
RunReader.new OffsetKey
offsetKey Ref (Run m h)
r
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
n Reader m h
rs
FromReaders ReadersMergeType
mergeType [ReaderSource m h]
nestedSources -> do
ResolveSerialisedValue
-> OffsetKey -> [ReaderSource m h] -> m (Maybe (Readers m h))
forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> OffsetKey -> [ReaderSource m h] -> m (Maybe (Readers m h))
new ResolveSerialisedValue
resolve OffsetKey
offsetKey [ReaderSource m h]
nestedSources m (Maybe (Readers m h))
-> (Maybe (Readers m h) -> m (Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Readers m h)
Nothing -> Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ReadCtx m h)
forall a. Maybe a
Nothing
Just Readers m h
rs -> ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
n (ReadersMergeType -> SMaybe (Readers m h) -> Reader m h
forall (m :: * -> *) h.
ReadersMergeType -> SMaybe (Readers m h) -> Reader m h
ReadReaders ReadersMergeType
mergeType (Readers m h -> SMaybe (Readers m h)
forall a. a -> SMaybe a
SJust Readers m h
rs))
fromWB :: WB.WriteBuffer -> Ref (WB.WriteBufferBlobs m h) -> m (Reader m h)
fromWB :: WriteBuffer -> Ref (WriteBufferBlobs m h) -> m (Reader m h)
fromWB WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbblobs = do
let kops :: [(SerialisedKey, Entry SerialisedValue BlobSpan)]
kops = Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
forall k a. Map k a -> [(k, a)]
Map.toList (Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)])
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
forall a b. (a -> b) -> a -> b
$ Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
filterWB (Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan))
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall a b. (a -> b) -> a -> b
$ WriteBuffer -> Map SerialisedKey (Entry SerialisedValue BlobSpan)
WB.toMap WriteBuffer
wb
MutVar (PrimState m) [KOp m h] -> Reader m h
forall (m :: * -> *) h.
MutVar (PrimState m) [KOp m h] -> Reader m h
ReadBuffer (MutVar (PrimState m) [KOp m h] -> Reader m h)
-> m (MutVar (PrimState m) [KOp m h]) -> m (Reader m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [KOp m h] -> m (MutVar (PrimState m) [KOp m h])
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar (((SerialisedKey, Entry SerialisedValue BlobSpan) -> KOp m h)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)] -> [KOp m h]
forall a b. (a -> b) -> [a] -> [b]
map (SerialisedKey, Entry SerialisedValue BlobSpan) -> KOp m h
forall k v. (k, Entry v BlobSpan) -> (k, Entry v (RawBlobRef m h))
convertBlobs [(SerialisedKey, Entry SerialisedValue BlobSpan)]
kops)
where
convertBlobs :: (k, Entry v BlobSpan) -> (k, Entry v (RawBlobRef m h))
convertBlobs :: forall k v. (k, Entry v BlobSpan) -> (k, Entry v (RawBlobRef m h))
convertBlobs = (Entry v BlobSpan -> Entry v (RawBlobRef m h))
-> (k, Entry v BlobSpan) -> (k, Entry v (RawBlobRef m h))
forall a b. (a -> b) -> (k, a) -> (k, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((BlobSpan -> RawBlobRef m h)
-> Entry v BlobSpan -> Entry v (RawBlobRef m h)
forall a b. (a -> b) -> Entry v a -> Entry v b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
WB.mkRawBlobRef Ref (WriteBufferBlobs m h)
wbblobs))
filterWB :: Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
filterWB = case OffsetKey
offsetKey of
OffsetKey
NoOffsetKey -> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall a. a -> a
id
OffsetKey SerialisedKey
k -> (SerialisedKey -> Bool)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall k a. (k -> Bool) -> Map k a -> Map k a
Map.dropWhileAntitone (SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
< SerialisedKey
k)
{-# SPECIALISE close :: Readers IO (FS.Handle h) -> IO () #-}
close ::
(MonadMask m, MonadSTM m, PrimMonad m)
=> Readers m h
-> m ()
close :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
close Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
ReadCtx {Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
Reader m h -> m ()
forall {h}. Reader m h -> m ()
closeReader Reader m h
readCtxReader
m ()
closeHeap
where
closeReader :: Reader m h -> m ()
closeReader = \case
ReadBuffer MutVar (PrimState m) [KOp m h]
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ReadRun RunReader m h
r -> RunReader m h -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
RunReader m h -> m ()
RunReader.close RunReader m h
r
ReadReaders ReadersMergeType
_ SMaybe (Readers m h)
readersMay -> m () -> (Readers m h -> m ()) -> SMaybe (Readers m h) -> m ()
forall b a. b -> (a -> b) -> SMaybe a -> b
smaybe (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) Readers m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
close SMaybe (Readers m h)
readersMay
closeHeap :: m ()
closeHeap =
MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap m (Maybe (ReadCtx m h)) -> (Maybe (ReadCtx m h) -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (ReadCtx m h)
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just ReadCtx {Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} -> do
Reader m h -> m ()
forall {h}. Reader m h -> m ()
closeReader Reader m h
readCtxReader
m ()
closeHeap
{-# SPECIALISE peekKey :: Readers IO h -> IO SerialisedKey #-}
peekKey ::
PrimMonad m
=> Readers m h
-> m SerialisedKey
peekKey :: forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
peekKey Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey (ReadCtx m h -> SerialisedKey)
-> m (ReadCtx m h) -> m SerialisedKey
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
data HasMore = HasMore | Drained
deriving stock (HasMore -> HasMore -> Bool
(HasMore -> HasMore -> Bool)
-> (HasMore -> HasMore -> Bool) -> Eq HasMore
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: HasMore -> HasMore -> Bool
== :: HasMore -> HasMore -> Bool
$c/= :: HasMore -> HasMore -> Bool
/= :: HasMore -> HasMore -> Bool
Eq, Int -> HasMore -> ShowS
[HasMore] -> ShowS
HasMore -> String
(Int -> HasMore -> ShowS)
-> (HasMore -> String) -> ([HasMore] -> ShowS) -> Show HasMore
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> HasMore -> ShowS
showsPrec :: Int -> HasMore -> ShowS
$cshow :: HasMore -> String
show :: HasMore -> String
$cshowList :: [HasMore] -> ShowS
showList :: [HasMore] -> ShowS
Show)
{-# SPECIALISE pop ::
ResolveSerialisedValue
-> Readers IO h
-> IO (SerialisedKey, RunReader.Entry IO h, HasMore) #-}
pop ::
(MonadMask m, MonadSTM m, MonadST m)
=> ResolveSerialisedValue
-> Readers m h
-> m (SerialisedKey, RunReader.Entry m h, HasMore)
pop :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
pop ResolveSerialisedValue
resolve r :: Readers m h
r@Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadEntry :: forall (m :: * -> *) h. ReadCtx m h -> Entry m h
readCtxNumber :: forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
..} <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
HasMore
hasMore <- ResolveSerialisedValue
-> Readers m h -> ReaderNumber -> Reader m h -> m HasMore
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> ReaderNumber -> Reader m h -> m HasMore
dropOne ResolveSerialisedValue
resolve Readers m h
r ReaderNumber
readCtxNumber Reader m h
readCtxReader
(SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
readCtxHeadKey, Entry m h
readCtxHeadEntry, HasMore
hasMore)
data ReadersMergeType = MergeLevel | MergeUnion
deriving stock (ReadersMergeType -> ReadersMergeType -> Bool
(ReadersMergeType -> ReadersMergeType -> Bool)
-> (ReadersMergeType -> ReadersMergeType -> Bool)
-> Eq ReadersMergeType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReadersMergeType -> ReadersMergeType -> Bool
== :: ReadersMergeType -> ReadersMergeType -> Bool
$c/= :: ReadersMergeType -> ReadersMergeType -> Bool
/= :: ReadersMergeType -> ReadersMergeType -> Bool
Eq, Int -> ReadersMergeType -> ShowS
[ReadersMergeType] -> ShowS
ReadersMergeType -> String
(Int -> ReadersMergeType -> ShowS)
-> (ReadersMergeType -> String)
-> ([ReadersMergeType] -> ShowS)
-> Show ReadersMergeType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReadersMergeType -> ShowS
showsPrec :: Int -> ReadersMergeType -> ShowS
$cshow :: ReadersMergeType -> String
show :: ReadersMergeType -> String
$cshowList :: [ReadersMergeType] -> ShowS
showList :: [ReadersMergeType] -> ShowS
Show)
{-# SPECIALISE popResolved ::
ResolveSerialisedValue
-> ReadersMergeType
-> Readers IO h
-> IO (SerialisedKey, RunReader.Entry IO h, HasMore) #-}
popResolved ::
forall h m.
(MonadMask m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> ReadersMergeType
-> Readers m h
-> m (SerialisedKey, RunReader.Entry m h, HasMore)
popResolved :: forall h (m :: * -> *).
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> ReadersMergeType
-> Readers m h
-> m (SerialisedKey, Entry m h, HasMore)
popResolved ResolveSerialisedValue
resolve ReadersMergeType
mergeType Readers m h
readers = m (SerialisedKey, Entry m h, HasMore)
readEntry
where
readEntry :: m (SerialisedKey, RunReader.Entry m h, HasMore)
readEntry :: m (SerialisedKey, Entry m h, HasMore)
readEntry = do
(SerialisedKey
key, Entry m h
entry, HasMore
hasMore) <- ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
pop ResolveSerialisedValue
resolve Readers m h
readers
case HasMore
hasMore of
HasMore
Drained -> do
(SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry m h
entry, HasMore
Drained)
HasMore
HasMore -> do
case ReadersMergeType
mergeType of
ReadersMergeType
MergeLevel -> SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleLevel SerialisedKey
key (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
RunReader.toFullEntry Entry m h
entry)
ReadersMergeType
MergeUnion -> SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleUnion SerialisedKey
key (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
RunReader.toFullEntry Entry m h
entry)
handleUnion :: SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, RunReader.Entry m h, HasMore)
handleUnion :: SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleUnion SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entry = do
SerialisedKey
nextKey <- Readers m h -> m SerialisedKey
forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
peekKey Readers m h
readers
if SerialisedKey
nextKey SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
/= SerialisedKey
key
then
(SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry Entry SerialisedValue (RawBlobRef m h)
entry, HasMore
HasMore)
else do
(SerialisedKey
_, Entry m h
nextEntry, HasMore
hasMore) <- ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
pop ResolveSerialisedValue
resolve Readers m h
readers
let resolved :: Entry SerialisedValue (RawBlobRef m h)
resolved = ResolveSerialisedValue
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
forall v b. (v -> v -> v) -> Entry v b -> Entry v b -> Entry v b
Entry.combineUnion ResolveSerialisedValue
resolve Entry SerialisedValue (RawBlobRef m h)
entry
(Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
RunReader.toFullEntry Entry m h
nextEntry)
case HasMore
hasMore of
HasMore
HasMore -> SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleUnion SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
resolved
HasMore
Drained -> (SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry Entry SerialisedValue (RawBlobRef m h)
resolved, HasMore
Drained)
handleLevel :: SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, RunReader.Entry m h, HasMore)
handleLevel :: SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleLevel SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entry =
case Entry SerialisedValue (RawBlobRef m h)
entry of
Upsert SerialisedValue
v ->
SerialisedKey
-> SerialisedValue -> m (SerialisedKey, Entry m h, HasMore)
handleMupdate SerialisedKey
key SerialisedValue
v
Entry SerialisedValue (RawBlobRef m h)
_ -> do
HasMore
hasMore' <- SerialisedKey -> m HasMore
dropRemaining SerialisedKey
key
(SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry Entry SerialisedValue (RawBlobRef m h)
entry, HasMore
hasMore')
handleMupdate :: SerialisedKey
-> SerialisedValue
-> m (SerialisedKey, RunReader.Entry m h, HasMore)
handleMupdate :: SerialisedKey
-> SerialisedValue -> m (SerialisedKey, Entry m h, HasMore)
handleMupdate SerialisedKey
key SerialisedValue
v = do
SerialisedKey
nextKey <- Readers m h -> m SerialisedKey
forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
peekKey Readers m h
readers
if SerialisedKey
nextKey SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
/= SerialisedKey
key
then
(SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry (SerialisedValue -> Entry SerialisedValue (RawBlobRef m h)
forall v b. v -> Entry v b
Upsert SerialisedValue
v), HasMore
HasMore)
else do
(SerialisedKey
_, Entry m h
nextEntry, HasMore
hasMore) <- ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
pop ResolveSerialisedValue
resolve Readers m h
readers
let resolved :: Entry SerialisedValue (RawBlobRef m h)
resolved = ResolveSerialisedValue
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
forall v b. (v -> v -> v) -> Entry v b -> Entry v b -> Entry v b
Entry.combine ResolveSerialisedValue
resolve (SerialisedValue -> Entry SerialisedValue (RawBlobRef m h)
forall v b. v -> Entry v b
Upsert SerialisedValue
v)
(Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
RunReader.toFullEntry Entry m h
nextEntry)
case HasMore
hasMore of
HasMore
HasMore -> SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleLevel SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
resolved
HasMore
Drained -> (SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry Entry SerialisedValue (RawBlobRef m h)
resolved, HasMore
Drained)
dropRemaining :: SerialisedKey -> m HasMore
dropRemaining :: SerialisedKey -> m HasMore
dropRemaining SerialisedKey
key = do
(Int
_, HasMore
hasMore) <- ResolveSerialisedValue
-> Readers m h -> SerialisedKey -> m (Int, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> SerialisedKey -> m (Int, HasMore)
dropWhileKey ResolveSerialisedValue
resolve Readers m h
readers SerialisedKey
key
HasMore -> m HasMore
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HasMore
hasMore
{-# SPECIALISE dropWhileKey ::
ResolveSerialisedValue
-> Readers IO h
-> SerialisedKey
-> IO (Int, HasMore) #-}
dropWhileKey ::
(MonadMask m, MonadSTM m, MonadST m)
=> ResolveSerialisedValue
-> Readers m h
-> SerialisedKey
-> m (Int, HasMore)
dropWhileKey :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> SerialisedKey -> m (Int, HasMore)
dropWhileKey ResolveSerialisedValue
resolve Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} SerialisedKey
key = do
ReadCtx m h
cur <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
if ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
cur SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
<= SerialisedKey
key
then Int -> ReadCtx m h -> m (Int, HasMore)
go Int
0 ReadCtx m h
cur
else (Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
0, HasMore
HasMore)
where
go :: Int -> ReadCtx m h -> m (Int, HasMore)
go !Int
n ReadCtx {ReaderNumber
readCtxNumber :: forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber :: ReaderNumber
readCtxNumber, Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} = do
Maybe (ReadCtx m h)
mNext <- ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
readCtxNumber Reader m h
readCtxReader m (Maybe (ReadCtx m h))
-> (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (ReadCtx m h)
Nothing -> MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap
Just ReadCtx m h
ctx -> ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just (ReadCtx m h -> Maybe (ReadCtx m h))
-> m (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutableHeap (PrimState m) (ReadCtx m h)
-> ReadCtx m h -> m (ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> a -> m a
Heap.replaceRoot MutableHeap (PrimState m) (ReadCtx m h)
readersHeap ReadCtx m h
ctx
let !n' :: Int
n' = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
case Maybe (ReadCtx m h)
mNext of
Maybe (ReadCtx m h)
Nothing -> do
(Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
n', HasMore
Drained)
Just ReadCtx m h
next -> do
if ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
next SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
<= SerialisedKey
key
then
Int -> ReadCtx m h -> m (Int, HasMore)
go Int
n' ReadCtx m h
next
else do
MutVar (PrimState m) (ReadCtx m h) -> ReadCtx m h -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext ReadCtx m h
next
(Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
n', HasMore
HasMore)
{-# SPECIALISE dropOne ::
ResolveSerialisedValue
-> Readers IO h
-> ReaderNumber
-> Reader IO h
-> IO HasMore #-}
dropOne ::
(MonadMask m, MonadSTM m, MonadST m)
=> ResolveSerialisedValue
-> Readers m h
-> ReaderNumber
-> Reader m h
-> m HasMore
dropOne :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> ReaderNumber -> Reader m h -> m HasMore
dropOne ResolveSerialisedValue
resolve Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} ReaderNumber
number Reader m h
reader = do
Maybe (ReadCtx m h)
mNext <- ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
number Reader m h
reader m (Maybe (ReadCtx m h))
-> (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (ReadCtx m h)
Nothing -> MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap
Just ReadCtx m h
ctx -> ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just (ReadCtx m h -> Maybe (ReadCtx m h))
-> m (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutableHeap (PrimState m) (ReadCtx m h)
-> ReadCtx m h -> m (ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> a -> m a
Heap.replaceRoot MutableHeap (PrimState m) (ReadCtx m h)
readersHeap ReadCtx m h
ctx
case Maybe (ReadCtx m h)
mNext of
Maybe (ReadCtx m h)
Nothing ->
HasMore -> m HasMore
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HasMore
Drained
Just ReadCtx m h
next -> do
MutVar (PrimState m) (ReadCtx m h) -> ReadCtx m h -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext ReadCtx m h
next
HasMore -> m HasMore
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HasMore
HasMore
{-# SPECIALISE nextReadCtx ::
ResolveSerialisedValue
-> ReaderNumber
-> Reader IO h
-> IO (Maybe (ReadCtx IO h)) #-}
nextReadCtx ::
(MonadMask m, MonadSTM m, MonadST m)
=> ResolveSerialisedValue
-> ReaderNumber
-> Reader m h
-> m (Maybe (ReadCtx m h))
nextReadCtx :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
readCtxNumber Reader m h
readCtxReader =
case Reader m h
readCtxReader of
ReadBuffer MutVar (PrimState m) [KOp m h]
r -> MutVar (PrimState m) [KOp m h]
-> ([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar MutVar (PrimState m) [KOp m h]
r (([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h)))
-> ([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. (a -> b) -> a -> b
$ \case
[] ->
([], Maybe (ReadCtx m h)
forall a. Maybe a
Nothing)
((SerialisedKey
readCtxHeadKey, Entry SerialisedValue (RawBlobRef m h)
e) : [KOp m h]
rest) ->
let readCtxHeadEntry :: Entry m h
readCtxHeadEntry = Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry Entry SerialisedValue (RawBlobRef m h)
e
in ([KOp m h]
rest, ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
..})
ReadRun RunReader m h
r -> RunReader m h -> m (Result m h)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
RunReader m h -> m (Result m h)
RunReader.next RunReader m h
r m (Result m h)
-> (Result m h -> Maybe (ReadCtx m h)) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Result m h
RunReader.Empty ->
Maybe (ReadCtx m h)
forall a. Maybe a
Nothing
RunReader.ReadEntry SerialisedKey
readCtxHeadKey Entry m h
readCtxHeadEntry ->
ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
..}
ReadReaders ReadersMergeType
mergeType SMaybe (Readers m h)
readersMay -> case SMaybe (Readers m h)
readersMay of
SMaybe (Readers m h)
SNothing ->
Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ReadCtx m h)
forall a. Maybe a
Nothing
SJust Readers m h
readers -> do
(SerialisedKey
readCtxHeadKey, Entry m h
readCtxHeadEntry, HasMore
hasMore) <-
ResolveSerialisedValue
-> ReadersMergeType
-> Readers m h
-> m (SerialisedKey, Entry m h, HasMore)
forall h (m :: * -> *).
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> ReadersMergeType
-> Readers m h
-> m (SerialisedKey, Entry m h, HasMore)
popResolved ResolveSerialisedValue
resolve ReadersMergeType
mergeType Readers m h
readers
let readersMay' :: SMaybe (Readers m h)
readersMay' = case HasMore
hasMore of
HasMore
Drained -> SMaybe (Readers m h)
forall a. SMaybe a
SNothing
HasMore
HasMore -> Readers m h -> SMaybe (Readers m h)
forall a. a -> SMaybe a
SJust Readers m h
readers
Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h)))
-> Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a b. (a -> b) -> a -> b
$ ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just ReadCtx {
readCtxReader :: Reader m h
readCtxReader = ReadersMergeType -> SMaybe (Readers m h) -> Reader m h
forall (m :: * -> *) h.
ReadersMergeType -> SMaybe (Readers m h) -> Reader m h
ReadReaders ReadersMergeType
mergeType SMaybe (Readers m h)
readersMay'
, SerialisedKey
Entry m h
ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxNumber :: ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
..
}