{-# OPTIONS_HADDOCK not-home #-}
module Database.LSMTree.Internal.RunReaders (
Readers (..)
, OffsetKey (..)
, new
, close
, peekKey
, HasMore (..)
, pop
, dropWhileKey
, Reader (..)
, ReaderNumber (..)
, ReadCtx (..)
) where
import Control.Monad (zipWithM)
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadSTM (MonadSTM (..))
import Control.Monad.Class.MonadThrow (MonadMask)
import Control.Monad.Primitive
import Control.RefCount
import Data.Function (on)
import Data.Functor ((<&>))
import Data.List.NonEmpty (nonEmpty)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes)
import Data.Primitive.MutVar
import Data.Traversable (for)
import qualified Data.Vector as V
import Database.LSMTree.Internal.BlobRef (RawBlobRef)
import Database.LSMTree.Internal.Entry (Entry (..))
import Database.LSMTree.Internal.Run (Run)
import Database.LSMTree.Internal.RunReader (OffsetKey (..),
RunReader (..))
import qualified Database.LSMTree.Internal.RunReader as Reader
import Database.LSMTree.Internal.Serialise
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WB
import qualified KMerge.Heap as Heap
import qualified System.FS.API as FS
data Readers m h = Readers {
forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersHeap :: !(Heap.MutableHeap (PrimState m) (ReadCtx m h))
, forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersNext :: !(MutVar (PrimState m) (ReadCtx m h))
}
newtype ReaderNumber = ReaderNumber Int
deriving stock (ReaderNumber -> ReaderNumber -> Bool
(ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool) -> Eq ReaderNumber
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReaderNumber -> ReaderNumber -> Bool
== :: ReaderNumber -> ReaderNumber -> Bool
$c/= :: ReaderNumber -> ReaderNumber -> Bool
/= :: ReaderNumber -> ReaderNumber -> Bool
Eq, Eq ReaderNumber
Eq ReaderNumber =>
(ReaderNumber -> ReaderNumber -> Ordering)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> ReaderNumber)
-> (ReaderNumber -> ReaderNumber -> ReaderNumber)
-> Ord ReaderNumber
ReaderNumber -> ReaderNumber -> Bool
ReaderNumber -> ReaderNumber -> Ordering
ReaderNumber -> ReaderNumber -> ReaderNumber
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ReaderNumber -> ReaderNumber -> Ordering
compare :: ReaderNumber -> ReaderNumber -> Ordering
$c< :: ReaderNumber -> ReaderNumber -> Bool
< :: ReaderNumber -> ReaderNumber -> Bool
$c<= :: ReaderNumber -> ReaderNumber -> Bool
<= :: ReaderNumber -> ReaderNumber -> Bool
$c> :: ReaderNumber -> ReaderNumber -> Bool
> :: ReaderNumber -> ReaderNumber -> Bool
$c>= :: ReaderNumber -> ReaderNumber -> Bool
>= :: ReaderNumber -> ReaderNumber -> Bool
$cmax :: ReaderNumber -> ReaderNumber -> ReaderNumber
max :: ReaderNumber -> ReaderNumber -> ReaderNumber
$cmin :: ReaderNumber -> ReaderNumber -> ReaderNumber
min :: ReaderNumber -> ReaderNumber -> ReaderNumber
Ord)
data ReadCtx m h = ReadCtx {
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey :: !SerialisedKey
, forall (m :: * -> *) h. ReadCtx m h -> Entry m h
readCtxHeadEntry :: !(Reader.Entry m h)
, forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber :: !ReaderNumber
, forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: !(Reader m h)
}
instance Eq (ReadCtx m h) where
== :: ReadCtx m h -> ReadCtx m h -> Bool
(==) = (SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Bool
forall a. Eq a => a -> a -> Bool
(==) ((SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Bool)
-> (ReadCtx m h -> (SerialisedKey, ReaderNumber))
-> ReadCtx m h
-> ReadCtx m h
-> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (\ReadCtx m h
r -> (ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
r, ReadCtx m h -> ReaderNumber
forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber ReadCtx m h
r))
instance Ord (ReadCtx m h) where
compare :: ReadCtx m h -> ReadCtx m h -> Ordering
compare = (SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Ordering
forall a. Ord a => a -> a -> Ordering
compare ((SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Ordering)
-> (ReadCtx m h -> (SerialisedKey, ReaderNumber))
-> ReadCtx m h
-> ReadCtx m h
-> Ordering
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (\ReadCtx m h
r -> (ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
r, ReadCtx m h -> ReaderNumber
forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber ReadCtx m h
r))
data Reader m h =
ReadRun !(RunReader m h)
| ReadBuffer !(MutVar (PrimState m) [KOp m h])
type KOp m h = (SerialisedKey, Entry SerialisedValue (RawBlobRef m h))
{-# SPECIALISE new ::
OffsetKey
-> Maybe (WB.WriteBuffer, Ref (WB.WriteBufferBlobs IO h))
-> V.Vector (Ref (Run IO h))
-> IO (Maybe (Readers IO h)) #-}
new :: forall m h.
(MonadMask m, MonadST m, MonadSTM m)
=> OffsetKey
-> Maybe (WB.WriteBuffer, Ref (WB.WriteBufferBlobs m h))
-> V.Vector (Ref (Run m h))
-> m (Maybe (Readers m h))
new :: forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
OffsetKey
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
-> Vector (Ref (Run m h))
-> m (Maybe (Readers m h))
new !OffsetKey
offsetKey Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
wbs Vector (Ref (Run m h))
runs = do
Maybe (ReadCtx m h)
wBuffer <- m (Maybe (ReadCtx m h))
-> ((WriteBuffer, Ref (WriteBufferBlobs m h))
-> m (Maybe (ReadCtx m h)))
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
-> m (Maybe (ReadCtx m h))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ReadCtx m h)
forall a. Maybe a
Nothing) ((WriteBuffer
-> Ref (WriteBufferBlobs m h) -> m (Maybe (ReadCtx m h)))
-> (WriteBuffer, Ref (WriteBufferBlobs m h))
-> m (Maybe (ReadCtx m h))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry WriteBuffer
-> Ref (WriteBufferBlobs m h) -> m (Maybe (ReadCtx m h))
fromWB) Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
wbs
[Maybe (ReadCtx m h)]
readers <- (Int -> Ref (Run m h) -> m (Maybe (ReadCtx m h)))
-> [Int] -> [Ref (Run m h)] -> m [Maybe (ReadCtx m h)]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM (ReaderNumber -> Ref (Run m h) -> m (Maybe (ReadCtx m h))
fromRun (ReaderNumber -> Ref (Run m h) -> m (Maybe (ReadCtx m h)))
-> (Int -> ReaderNumber)
-> Int
-> Ref (Run m h)
-> m (Maybe (ReadCtx m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ReaderNumber
ReaderNumber) [Int
1..] (Vector (Ref (Run m h)) -> [Ref (Run m h)]
forall a. Vector a -> [a]
V.toList Vector (Ref (Run m h))
runs)
let contexts :: Maybe (NonEmpty (ReadCtx m h))
contexts = [ReadCtx m h] -> Maybe (NonEmpty (ReadCtx m h))
forall a. [a] -> Maybe (NonEmpty a)
nonEmpty ([ReadCtx m h] -> Maybe (NonEmpty (ReadCtx m h)))
-> ([Maybe (ReadCtx m h)] -> [ReadCtx m h])
-> [Maybe (ReadCtx m h)]
-> Maybe (NonEmpty (ReadCtx m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Maybe (ReadCtx m h)] -> [ReadCtx m h]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe (ReadCtx m h)] -> Maybe (NonEmpty (ReadCtx m h)))
-> [Maybe (ReadCtx m h)] -> Maybe (NonEmpty (ReadCtx m h))
forall a b. (a -> b) -> a -> b
$ Maybe (ReadCtx m h)
wBuffer Maybe (ReadCtx m h)
-> [Maybe (ReadCtx m h)] -> [Maybe (ReadCtx m h)]
forall a. a -> [a] -> [a]
: [Maybe (ReadCtx m h)]
readers
Maybe (NonEmpty (ReadCtx m h))
-> (NonEmpty (ReadCtx m h) -> m (Readers m h))
-> m (Maybe (Readers m h))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for Maybe (NonEmpty (ReadCtx m h))
contexts ((NonEmpty (ReadCtx m h) -> m (Readers m h))
-> m (Maybe (Readers m h)))
-> (NonEmpty (ReadCtx m h) -> m (Readers m h))
-> m (Maybe (Readers m h))
forall a b. (a -> b) -> a -> b
$ \NonEmpty (ReadCtx m h)
xs -> do
(MutableHeap (PrimState m) (ReadCtx m h)
readersHeap, ReadCtx m h
readCtx) <- NonEmpty (ReadCtx m h)
-> m (MutableHeap (PrimState m) (ReadCtx m h), ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
NonEmpty a -> m (MutableHeap (PrimState m) a, a)
Heap.newMutableHeap NonEmpty (ReadCtx m h)
xs
MutVar (PrimState m) (ReadCtx m h)
readersNext <- ReadCtx m h -> m (MutVar (PrimState m) (ReadCtx m h))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar ReadCtx m h
readCtx
Readers m h -> m (Readers m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..}
where
fromWB :: WB.WriteBuffer
-> Ref (WB.WriteBufferBlobs m h)
-> m (Maybe (ReadCtx m h))
fromWB :: WriteBuffer
-> Ref (WriteBufferBlobs m h) -> m (Maybe (ReadCtx m h))
fromWB WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbblobs = do
MutVar
(PrimState m)
[(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
kops <- [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
-> m (MutVar
(PrimState m)
[(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))])
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar ([(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
-> m (MutVar
(PrimState m)
[(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]))
-> [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
-> m (MutVar
(PrimState m)
[(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))])
forall a b. (a -> b) -> a -> b
$ ((SerialisedKey, Entry SerialisedValue BlobSpan)
-> (SerialisedKey, Entry SerialisedValue (RawBlobRef m h)))
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
-> [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
forall a b. (a -> b) -> [a] -> [b]
map ((Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (RawBlobRef m h))
-> (SerialisedKey, Entry SerialisedValue BlobSpan)
-> (SerialisedKey, Entry SerialisedValue (RawBlobRef m h))
forall a b. (a -> b) -> (SerialisedKey, a) -> (SerialisedKey, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((BlobSpan -> RawBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (RawBlobRef m h)
forall a b.
(a -> b) -> Entry SerialisedValue a -> Entry SerialisedValue b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
WB.mkRawBlobRef Ref (WriteBufferBlobs m h)
wbblobs))) ([(SerialisedKey, Entry SerialisedValue BlobSpan)]
-> [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))])
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
-> [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
forall a b. (a -> b) -> a -> b
$
Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
forall k a. Map k a -> [(k, a)]
Map.toList (Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)])
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
forall a b. (a -> b) -> a -> b
$ Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
filterWB (Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan))
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall a b. (a -> b) -> a -> b
$ WriteBuffer -> Map SerialisedKey (Entry SerialisedValue BlobSpan)
WB.toMap WriteBuffer
wb
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx (Int -> ReaderNumber
ReaderNumber Int
0) (MutVar
(PrimState m)
[(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
-> Reader m h
forall (m :: * -> *) h.
MutVar (PrimState m) [KOp m h] -> Reader m h
ReadBuffer MutVar
(PrimState m)
[(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
kops)
where
filterWB :: Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
filterWB = case OffsetKey
offsetKey of
OffsetKey
NoOffsetKey -> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall a. a -> a
id
OffsetKey SerialisedKey
k -> (SerialisedKey -> Bool)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall k a. (k -> Bool) -> Map k a -> Map k a
Map.dropWhileAntitone (SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
< SerialisedKey
k)
fromRun :: ReaderNumber -> Ref (Run m h) -> m (Maybe (ReadCtx m h))
fromRun :: ReaderNumber -> Ref (Run m h) -> m (Maybe (ReadCtx m h))
fromRun ReaderNumber
n Ref (Run m h)
run = do
RunReader m h
reader <- OffsetKey -> Ref (Run m h) -> m (RunReader m h)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
OffsetKey -> Ref (Run m h) -> m (RunReader m h)
Reader.new OffsetKey
offsetKey Ref (Run m h)
run
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ReaderNumber
n (RunReader m h -> Reader m h
forall (m :: * -> *) h. RunReader m h -> Reader m h
ReadRun RunReader m h
reader)
{-# SPECIALISE close ::
Readers IO (FS.Handle h)
-> IO () #-}
close ::
(MonadMask m, MonadSTM m, PrimMonad m)
=> Readers m h
-> m ()
close :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
close Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
ReadCtx {Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
Reader m h -> m ()
forall {h}. Reader m h -> m ()
closeReader Reader m h
readCtxReader
m ()
closeHeap
where
closeReader :: Reader m h -> m ()
closeReader = \case
ReadRun RunReader m h
r -> RunReader m h -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
RunReader m h -> m ()
Reader.close RunReader m h
r
ReadBuffer MutVar (PrimState m) [KOp m h]
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
closeHeap :: m ()
closeHeap =
MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap m (Maybe (ReadCtx m h)) -> (Maybe (ReadCtx m h) -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (ReadCtx m h)
Nothing -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just ReadCtx {Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} -> do
Reader m h -> m ()
forall {h}. Reader m h -> m ()
closeReader Reader m h
readCtxReader
m ()
closeHeap
{-# SPECIALISE peekKey ::
Readers IO h
-> IO SerialisedKey #-}
peekKey ::
PrimMonad m
=> Readers m h
-> m SerialisedKey
peekKey :: forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
peekKey Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey (ReadCtx m h -> SerialisedKey)
-> m (ReadCtx m h) -> m SerialisedKey
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
data HasMore = HasMore | Drained
deriving stock (HasMore -> HasMore -> Bool
(HasMore -> HasMore -> Bool)
-> (HasMore -> HasMore -> Bool) -> Eq HasMore
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: HasMore -> HasMore -> Bool
== :: HasMore -> HasMore -> Bool
$c/= :: HasMore -> HasMore -> Bool
/= :: HasMore -> HasMore -> Bool
Eq, Int -> HasMore -> ShowS
[HasMore] -> ShowS
HasMore -> String
(Int -> HasMore -> ShowS)
-> (HasMore -> String) -> ([HasMore] -> ShowS) -> Show HasMore
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> HasMore -> ShowS
showsPrec :: Int -> HasMore -> ShowS
$cshow :: HasMore -> String
show :: HasMore -> String
$cshowList :: [HasMore] -> ShowS
showList :: [HasMore] -> ShowS
Show)
{-# SPECIALISE pop ::
Readers IO h
-> IO (SerialisedKey, Reader.Entry IO h, HasMore) #-}
pop ::
(MonadMask m, MonadSTM m, MonadST m)
=> Readers m h
-> m (SerialisedKey, Reader.Entry m h, HasMore)
pop :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> m (SerialisedKey, Entry m h, HasMore)
pop r :: Readers m h
r@Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadEntry :: forall (m :: * -> *) h. ReadCtx m h -> Entry m h
readCtxNumber :: forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
..} <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
HasMore
hasMore <- Readers m h -> ReaderNumber -> Reader m h -> m HasMore
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> ReaderNumber -> Reader m h -> m HasMore
dropOne Readers m h
r ReaderNumber
readCtxNumber Reader m h
readCtxReader
(SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialisedKey
readCtxHeadKey, Entry m h
readCtxHeadEntry, HasMore
hasMore)
{-# SPECIALISE dropWhileKey ::
Readers IO h
-> SerialisedKey
-> IO (Int, HasMore) #-}
dropWhileKey ::
(MonadMask m, MonadSTM m, MonadST m)
=> Readers m h
-> SerialisedKey
-> m (Int, HasMore)
dropWhileKey :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> SerialisedKey -> m (Int, HasMore)
dropWhileKey Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} SerialisedKey
key = do
ReadCtx m h
cur <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
if ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
cur SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
== SerialisedKey
key
then Int -> ReadCtx m h -> m (Int, HasMore)
go Int
0 ReadCtx m h
cur
else (Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, HasMore
HasMore)
where
go :: Int -> ReadCtx m h -> m (Int, HasMore)
go !Int
n ReadCtx {ReaderNumber
readCtxNumber :: forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber :: ReaderNumber
readCtxNumber, Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} = do
Maybe (ReadCtx m h)
mNext <- ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ReaderNumber
readCtxNumber Reader m h
readCtxReader m (Maybe (ReadCtx m h))
-> (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (ReadCtx m h)
Nothing -> MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap
Just ReadCtx m h
ctx -> ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just (ReadCtx m h -> Maybe (ReadCtx m h))
-> m (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutableHeap (PrimState m) (ReadCtx m h)
-> ReadCtx m h -> m (ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> a -> m a
Heap.replaceRoot MutableHeap (PrimState m) (ReadCtx m h)
readersHeap ReadCtx m h
ctx
let !n' :: Int
n' = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
case Maybe (ReadCtx m h)
mNext of
Maybe (ReadCtx m h)
Nothing -> do
(Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
n', HasMore
Drained)
Just ReadCtx m h
next -> do
if ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
next SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
== SerialisedKey
key
then
Int -> ReadCtx m h -> m (Int, HasMore)
go Int
n' ReadCtx m h
next
else do
MutVar (PrimState m) (ReadCtx m h) -> ReadCtx m h -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext ReadCtx m h
next
(Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
n', HasMore
HasMore)
{-# SPECIALISE dropOne ::
Readers IO h
-> ReaderNumber
-> Reader IO h
-> IO HasMore #-}
dropOne ::
(MonadMask m, MonadSTM m, MonadST m)
=> Readers m h
-> ReaderNumber
-> Reader m h
-> m HasMore
dropOne :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> ReaderNumber -> Reader m h -> m HasMore
dropOne Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} ReaderNumber
number Reader m h
reader = do
Maybe (ReadCtx m h)
mNext <- ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ReaderNumber
number Reader m h
reader m (Maybe (ReadCtx m h))
-> (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (ReadCtx m h)
Nothing -> MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap
Just ReadCtx m h
ctx -> ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just (ReadCtx m h -> Maybe (ReadCtx m h))
-> m (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutableHeap (PrimState m) (ReadCtx m h)
-> ReadCtx m h -> m (ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> a -> m a
Heap.replaceRoot MutableHeap (PrimState m) (ReadCtx m h)
readersHeap ReadCtx m h
ctx
case Maybe (ReadCtx m h)
mNext of
Maybe (ReadCtx m h)
Nothing ->
HasMore -> m HasMore
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HasMore
Drained
Just ReadCtx m h
next -> do
MutVar (PrimState m) (ReadCtx m h) -> ReadCtx m h -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext ReadCtx m h
next
HasMore -> m HasMore
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HasMore
HasMore
{-# SPECIALISE nextReadCtx ::
ReaderNumber
-> Reader IO h
-> IO (Maybe (ReadCtx IO h)) #-}
nextReadCtx ::
(MonadMask m, MonadSTM m, MonadST m)
=> ReaderNumber
-> Reader m h
-> m (Maybe (ReadCtx m h))
nextReadCtx :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ReaderNumber
readCtxNumber Reader m h
readCtxReader =
case Reader m h
readCtxReader of
ReadRun RunReader m h
r -> RunReader m h -> m (Result m h)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
RunReader m h -> m (Result m h)
Reader.next RunReader m h
r m (Result m h)
-> (Result m h -> Maybe (ReadCtx m h)) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Result m h
Reader.Empty ->
Maybe (ReadCtx m h)
forall a. Maybe a
Nothing
Reader.ReadEntry SerialisedKey
readCtxHeadKey Entry m h
readCtxHeadEntry ->
ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
..}
ReadBuffer MutVar (PrimState m) [KOp m h]
r -> MutVar (PrimState m) [KOp m h]
-> ([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar MutVar (PrimState m) [KOp m h]
r (([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h)))
-> ([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. (a -> b) -> a -> b
$ \case
[] ->
([], Maybe (ReadCtx m h)
forall a. Maybe a
Nothing)
((SerialisedKey
readCtxHeadKey, Entry SerialisedValue (RawBlobRef m h)
e) : [KOp m h]
rest) ->
let readCtxHeadEntry :: Entry m h
readCtxHeadEntry = Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
Reader.Entry Entry SerialisedValue (RawBlobRef m h)
e
in ([KOp m h]
rest, ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
..})