{-# OPTIONS_HADDOCK not-home #-}

module Database.LSMTree.Internal.RunReaders (
    Readers (..)
  , OffsetKey (..)
  , new
  , close
  , peekKey
  , HasMore (..)
  , pop
  , dropWhileKey
    -- * Internals
  , Reader (..)
  , ReaderNumber (..)
  , ReadCtx (..)
  ) where

import           Control.Monad (zipWithM)
import           Control.Monad.Class.MonadST (MonadST)
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow (MonadMask)
import           Control.Monad.Primitive
import           Control.RefCount
import           Data.Function (on)
import           Data.Functor ((<&>))
import           Data.List.NonEmpty (nonEmpty)
import qualified Data.Map.Strict as Map
import           Data.Maybe (catMaybes)
import           Data.Primitive.MutVar
import           Data.Traversable (for)
import qualified Data.Vector as V
import           Database.LSMTree.Internal.BlobRef (RawBlobRef)
import           Database.LSMTree.Internal.Entry (Entry (..))
import           Database.LSMTree.Internal.Run (Run)
import           Database.LSMTree.Internal.RunReader (OffsetKey (..),
                     RunReader (..))
import qualified Database.LSMTree.Internal.RunReader as Reader
import           Database.LSMTree.Internal.Serialise
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WB
import qualified KMerge.Heap as Heap
import qualified System.FS.API as FS

-- | A collection of runs and write buffers being read from, yielding elements
-- in order. More precisely, that means first ordered by their key, then by the
-- input run they came from. This is important for resolving multiple entries
-- with the same key into one.
--
-- Construct with 'new', then keep calling 'pop'.
-- If aborting early, remember to call 'close'!
--
-- Creating a 'Readers' does not retain a reference to the input 'Run's or the
-- 'WriteBufferBlobs', but does retain an independent reference on their blob
-- files. It is not necessary to separately retain the 'Run's or the
-- 'WriteBufferBlobs' for correct use of the 'Readers'. There is one important
-- caveat however: to preserve the validity of 'BlobRef's then it is necessary
-- to separately retain a reference to the 'Run' or its 'BlobFile' to preserve
-- the validity of 'BlobRefs'.
--
-- TODO: do this more nicely by changing 'Reader' to preserve the 'BlobFile'
-- ref until it is explicitly closed, and also retain the 'BlobFile' from the
-- WBB and release all of these 'BlobFiles' once the 'Readers' is itself closed.
--
data Readers m h = Readers {
      forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersHeap :: !(Heap.MutableHeap (PrimState m) (ReadCtx m h))
      -- | Since there is always one reader outside of the heap, we need to
      -- store it separately. This also contains the next k\/op to yield, unless
      -- all readers are drained, i.e. both:
      -- 1. the reader inside the 'ReadCtx' is empty
      -- 2. the heap is empty
    , forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersNext :: !(MutVar (PrimState m) (ReadCtx m h))
    }

newtype ReaderNumber = ReaderNumber Int
  deriving stock (ReaderNumber -> ReaderNumber -> Bool
(ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool) -> Eq ReaderNumber
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReaderNumber -> ReaderNumber -> Bool
== :: ReaderNumber -> ReaderNumber -> Bool
$c/= :: ReaderNumber -> ReaderNumber -> Bool
/= :: ReaderNumber -> ReaderNumber -> Bool
Eq, Eq ReaderNumber
Eq ReaderNumber =>
(ReaderNumber -> ReaderNumber -> Ordering)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> ReaderNumber)
-> (ReaderNumber -> ReaderNumber -> ReaderNumber)
-> Ord ReaderNumber
ReaderNumber -> ReaderNumber -> Bool
ReaderNumber -> ReaderNumber -> Ordering
ReaderNumber -> ReaderNumber -> ReaderNumber
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ReaderNumber -> ReaderNumber -> Ordering
compare :: ReaderNumber -> ReaderNumber -> Ordering
$c< :: ReaderNumber -> ReaderNumber -> Bool
< :: ReaderNumber -> ReaderNumber -> Bool
$c<= :: ReaderNumber -> ReaderNumber -> Bool
<= :: ReaderNumber -> ReaderNumber -> Bool
$c> :: ReaderNumber -> ReaderNumber -> Bool
> :: ReaderNumber -> ReaderNumber -> Bool
$c>= :: ReaderNumber -> ReaderNumber -> Bool
>= :: ReaderNumber -> ReaderNumber -> Bool
$cmax :: ReaderNumber -> ReaderNumber -> ReaderNumber
max :: ReaderNumber -> ReaderNumber -> ReaderNumber
$cmin :: ReaderNumber -> ReaderNumber -> ReaderNumber
min :: ReaderNumber -> ReaderNumber -> ReaderNumber
Ord)

-- | Each heap element needs some more context than just the reader.
-- E.g. the 'Eq' instance we need to be able to access the first key to be read
-- in a pure way.
--
-- TODO(optimisation): We allocate this record for each k/op. This might be
-- avoidable, see ideas below.
data ReadCtx m h = ReadCtx {
      -- We could avoid this using a more specialised mutable heap with separate
      -- arrays for keys and values (or even each of their components).
      -- Using an 'STRef' could avoid reallocating the record for every entry,
      -- but that might not be straightforward to integrate with the heap.
      forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey   :: !SerialisedKey
    , forall (m :: * -> *) h. ReadCtx m h -> Entry m h
readCtxHeadEntry :: !(Reader.Entry m h)
      -- We could get rid of this by making 'LoserTree' stable (for which there
      -- is a prototype already).
      -- Alternatively, if we decide to have an invariant that the number in
      -- 'RunFsPaths' is always higher for newer runs, then we could use that
      -- in the 'Ord' instance.
    , forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber    :: !ReaderNumber
    , forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader    :: !(Reader m h)
    }

instance Eq (ReadCtx m h) where
  == :: ReadCtx m h -> ReadCtx m h -> Bool
(==) = (SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Bool
forall a. Eq a => a -> a -> Bool
(==) ((SerialisedKey, ReaderNumber)
 -> (SerialisedKey, ReaderNumber) -> Bool)
-> (ReadCtx m h -> (SerialisedKey, ReaderNumber))
-> ReadCtx m h
-> ReadCtx m h
-> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (\ReadCtx m h
r -> (ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
r, ReadCtx m h -> ReaderNumber
forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber ReadCtx m h
r))

-- | Makes sure we resolve entries in the right order.
instance Ord (ReadCtx m h) where
  compare :: ReadCtx m h -> ReadCtx m h -> Ordering
compare = (SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Ordering
forall a. Ord a => a -> a -> Ordering
compare ((SerialisedKey, ReaderNumber)
 -> (SerialisedKey, ReaderNumber) -> Ordering)
-> (ReadCtx m h -> (SerialisedKey, ReaderNumber))
-> ReadCtx m h
-> ReadCtx m h
-> Ordering
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (\ReadCtx m h
r -> (ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
r, ReadCtx m h -> ReaderNumber
forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber ReadCtx m h
r))

-- TODO: This is slightly inelegant. This module could work generally for
-- anything that can produce elements, but currently is very specific to having
-- write buffer and run readers. Also, for run merging, no write buffer is
-- involved, but we still need to branch on this sum type.
-- A more general version is possible, but despite SPECIALISE-ing everything
-- showed ~100 bytes of extra allocations per entry that is read (which might be
-- avoidable with some tinkering).
data Reader m h =
    ReadRun    !(RunReader m h)
    -- The list allows to incrementally read from the write buffer without
    -- having to find the next entry in the Map again (requiring key
    -- comparisons) or having to copy out all entries.
    -- TODO: more efficient representation? benchmark!
  | ReadBuffer !(MutVar (PrimState m) [KOp m h])

type KOp m h = (SerialisedKey, Entry SerialisedValue (RawBlobRef m h))

{-# SPECIALISE new ::
     OffsetKey
  -> Maybe (WB.WriteBuffer, Ref (WB.WriteBufferBlobs IO h))
  -> V.Vector (Ref (Run IO h))
  -> IO (Maybe (Readers IO h)) #-}
new :: forall m h.
     (MonadMask m, MonadST m, MonadSTM m)
  => OffsetKey
  -> Maybe (WB.WriteBuffer, Ref (WB.WriteBufferBlobs m h))
  -> V.Vector (Ref (Run m h))
  -> m (Maybe (Readers m h))
new :: forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
OffsetKey
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
-> Vector (Ref (Run m h))
-> m (Maybe (Readers m h))
new !OffsetKey
offsetKey Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
wbs Vector (Ref (Run m h))
runs = do
    Maybe (ReadCtx m h)
wBuffer <- m (Maybe (ReadCtx m h))
-> ((WriteBuffer, Ref (WriteBufferBlobs m h))
    -> m (Maybe (ReadCtx m h)))
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
-> m (Maybe (ReadCtx m h))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ReadCtx m h)
forall a. Maybe a
Nothing) ((WriteBuffer
 -> Ref (WriteBufferBlobs m h) -> m (Maybe (ReadCtx m h)))
-> (WriteBuffer, Ref (WriteBufferBlobs m h))
-> m (Maybe (ReadCtx m h))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry WriteBuffer
-> Ref (WriteBufferBlobs m h) -> m (Maybe (ReadCtx m h))
fromWB) Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
wbs
    [Maybe (ReadCtx m h)]
readers <- (Int -> Ref (Run m h) -> m (Maybe (ReadCtx m h)))
-> [Int] -> [Ref (Run m h)] -> m [Maybe (ReadCtx m h)]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM (ReaderNumber -> Ref (Run m h) -> m (Maybe (ReadCtx m h))
fromRun (ReaderNumber -> Ref (Run m h) -> m (Maybe (ReadCtx m h)))
-> (Int -> ReaderNumber)
-> Int
-> Ref (Run m h)
-> m (Maybe (ReadCtx m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ReaderNumber
ReaderNumber) [Int
1..] (Vector (Ref (Run m h)) -> [Ref (Run m h)]
forall a. Vector a -> [a]
V.toList Vector (Ref (Run m h))
runs)
    let contexts :: Maybe (NonEmpty (ReadCtx m h))
contexts = [ReadCtx m h] -> Maybe (NonEmpty (ReadCtx m h))
forall a. [a] -> Maybe (NonEmpty a)
nonEmpty ([ReadCtx m h] -> Maybe (NonEmpty (ReadCtx m h)))
-> ([Maybe (ReadCtx m h)] -> [ReadCtx m h])
-> [Maybe (ReadCtx m h)]
-> Maybe (NonEmpty (ReadCtx m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Maybe (ReadCtx m h)] -> [ReadCtx m h]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe (ReadCtx m h)] -> Maybe (NonEmpty (ReadCtx m h)))
-> [Maybe (ReadCtx m h)] -> Maybe (NonEmpty (ReadCtx m h))
forall a b. (a -> b) -> a -> b
$ Maybe (ReadCtx m h)
wBuffer Maybe (ReadCtx m h)
-> [Maybe (ReadCtx m h)] -> [Maybe (ReadCtx m h)]
forall a. a -> [a] -> [a]
: [Maybe (ReadCtx m h)]
readers
    Maybe (NonEmpty (ReadCtx m h))
-> (NonEmpty (ReadCtx m h) -> m (Readers m h))
-> m (Maybe (Readers m h))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for Maybe (NonEmpty (ReadCtx m h))
contexts ((NonEmpty (ReadCtx m h) -> m (Readers m h))
 -> m (Maybe (Readers m h)))
-> (NonEmpty (ReadCtx m h) -> m (Readers m h))
-> m (Maybe (Readers m h))
forall a b. (a -> b) -> a -> b
$ \NonEmpty (ReadCtx m h)
xs -> do
      (MutableHeap (PrimState m) (ReadCtx m h)
readersHeap, ReadCtx m h
readCtx) <- NonEmpty (ReadCtx m h)
-> m (MutableHeap (PrimState m) (ReadCtx m h), ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
NonEmpty a -> m (MutableHeap (PrimState m) a, a)
Heap.newMutableHeap NonEmpty (ReadCtx m h)
xs
      MutVar (PrimState m) (ReadCtx m h)
readersNext <- ReadCtx m h -> m (MutVar (PrimState m) (ReadCtx m h))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar ReadCtx m h
readCtx
      Readers m h -> m (Readers m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..}
  where
    fromWB :: WB.WriteBuffer
           -> Ref (WB.WriteBufferBlobs m h)
           -> m (Maybe (ReadCtx m h))
    fromWB :: WriteBuffer
-> Ref (WriteBufferBlobs m h) -> m (Maybe (ReadCtx m h))
fromWB WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbblobs = do
        --TODO: this BlobSpan to BlobRef conversion involves quite a lot of allocation
        MutVar
  (PrimState m)
  [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
kops <- [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
-> m (MutVar
        (PrimState m)
        [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))])
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar ([(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
 -> m (MutVar
         (PrimState m)
         [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]))
-> [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
-> m (MutVar
        (PrimState m)
        [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))])
forall a b. (a -> b) -> a -> b
$ ((SerialisedKey, Entry SerialisedValue BlobSpan)
 -> (SerialisedKey, Entry SerialisedValue (RawBlobRef m h)))
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
-> [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
forall a b. (a -> b) -> [a] -> [b]
map ((Entry SerialisedValue BlobSpan
 -> Entry SerialisedValue (RawBlobRef m h))
-> (SerialisedKey, Entry SerialisedValue BlobSpan)
-> (SerialisedKey, Entry SerialisedValue (RawBlobRef m h))
forall a b. (a -> b) -> (SerialisedKey, a) -> (SerialisedKey, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((BlobSpan -> RawBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (RawBlobRef m h)
forall a b.
(a -> b) -> Entry SerialisedValue a -> Entry SerialisedValue b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
WB.mkRawBlobRef Ref (WriteBufferBlobs m h)
wbblobs))) ([(SerialisedKey, Entry SerialisedValue BlobSpan)]
 -> [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))])
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
-> [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
forall a b. (a -> b) -> a -> b
$
                  Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
forall k a. Map k a -> [(k, a)]
Map.toList (Map SerialisedKey (Entry SerialisedValue BlobSpan)
 -> [(SerialisedKey, Entry SerialisedValue BlobSpan)])
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
forall a b. (a -> b) -> a -> b
$ Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
filterWB (Map SerialisedKey (Entry SerialisedValue BlobSpan)
 -> Map SerialisedKey (Entry SerialisedValue BlobSpan))
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall a b. (a -> b) -> a -> b
$ WriteBuffer -> Map SerialisedKey (Entry SerialisedValue BlobSpan)
WB.toMap WriteBuffer
wb
        ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx (Int -> ReaderNumber
ReaderNumber Int
0) (MutVar
  (PrimState m)
  [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
-> Reader m h
forall (m :: * -> *) h.
MutVar (PrimState m) [KOp m h] -> Reader m h
ReadBuffer MutVar
  (PrimState m)
  [(SerialisedKey, Entry SerialisedValue (RawBlobRef m h))]
kops)
      where
        filterWB :: Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
filterWB = case OffsetKey
offsetKey of
            OffsetKey
NoOffsetKey -> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall a. a -> a
id
            OffsetKey SerialisedKey
k -> (SerialisedKey -> Bool)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall k a. (k -> Bool) -> Map k a -> Map k a
Map.dropWhileAntitone (SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
< SerialisedKey
k)

    fromRun :: ReaderNumber -> Ref (Run m h) -> m (Maybe (ReadCtx m h))
    fromRun :: ReaderNumber -> Ref (Run m h) -> m (Maybe (ReadCtx m h))
fromRun ReaderNumber
n Ref (Run m h)
run = do
        RunReader m h
reader <- OffsetKey -> Ref (Run m h) -> m (RunReader m h)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
OffsetKey -> Ref (Run m h) -> m (RunReader m h)
Reader.new OffsetKey
offsetKey Ref (Run m h)
run
        ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ReaderNumber
n (RunReader m h -> Reader m h
forall (m :: * -> *) h. RunReader m h -> Reader m h
ReadRun RunReader m h
reader)

{-# SPECIALISE close ::
     Readers IO (FS.Handle h)
  -> IO () #-}
-- | Only call when aborting before all readers have been drained.
close ::
     (MonadMask m, MonadSTM m, PrimMonad m)
  => Readers m h
  -> m ()
close :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
close Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
    ReadCtx {Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
    Reader m h -> m ()
forall {h}. Reader m h -> m ()
closeReader Reader m h
readCtxReader
    m ()
closeHeap
  where
    closeReader :: Reader m h -> m ()
closeReader = \case
        ReadRun RunReader m h
r    -> RunReader m h -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
RunReader m h -> m ()
Reader.close RunReader m h
r
        ReadBuffer MutVar (PrimState m) [KOp m h]
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    closeHeap :: m ()
closeHeap =
        MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap m (Maybe (ReadCtx m h)) -> (Maybe (ReadCtx m h) -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Maybe (ReadCtx m h)
Nothing -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Just ReadCtx {Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} -> do
            Reader m h -> m ()
forall {h}. Reader m h -> m ()
closeReader Reader m h
readCtxReader
            m ()
closeHeap

{-# SPECIALISE peekKey ::
     Readers IO h
  -> IO SerialisedKey #-}
peekKey ::
     PrimMonad m
  => Readers m h
  -> m SerialisedKey
peekKey :: forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
peekKey Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
    ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey (ReadCtx m h -> SerialisedKey)
-> m (ReadCtx m h) -> m SerialisedKey
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext

-- | Once a function returned 'Drained', do not use the 'Readers' any more!
data HasMore = HasMore | Drained
  deriving stock (HasMore -> HasMore -> Bool
(HasMore -> HasMore -> Bool)
-> (HasMore -> HasMore -> Bool) -> Eq HasMore
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: HasMore -> HasMore -> Bool
== :: HasMore -> HasMore -> Bool
$c/= :: HasMore -> HasMore -> Bool
/= :: HasMore -> HasMore -> Bool
Eq, Int -> HasMore -> ShowS
[HasMore] -> ShowS
HasMore -> String
(Int -> HasMore -> ShowS)
-> (HasMore -> String) -> ([HasMore] -> ShowS) -> Show HasMore
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> HasMore -> ShowS
showsPrec :: Int -> HasMore -> ShowS
$cshow :: HasMore -> String
show :: HasMore -> String
$cshowList :: [HasMore] -> ShowS
showList :: [HasMore] -> ShowS
Show)

{-# SPECIALISE pop ::
    Readers IO h
  -> IO (SerialisedKey, Reader.Entry IO h, HasMore) #-}
pop ::
     (MonadMask m, MonadSTM m, MonadST m)
  => Readers m h
  -> m (SerialisedKey, Reader.Entry m h, HasMore)
pop :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> m (SerialisedKey, Entry m h, HasMore)
pop r :: Readers m h
r@Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
    ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadEntry :: forall (m :: * -> *) h. ReadCtx m h -> Entry m h
readCtxNumber :: forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
..} <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
    HasMore
hasMore <- Readers m h -> ReaderNumber -> Reader m h -> m HasMore
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> ReaderNumber -> Reader m h -> m HasMore
dropOne Readers m h
r ReaderNumber
readCtxNumber Reader m h
readCtxReader
    (SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialisedKey
readCtxHeadKey, Entry m h
readCtxHeadEntry, HasMore
hasMore)

{-# SPECIALISE dropWhileKey ::
     Readers IO h
  -> SerialisedKey
  -> IO (Int, HasMore) #-}
dropWhileKey ::
     (MonadMask m, MonadSTM m, MonadST m)
  => Readers m h
  -> SerialisedKey
  -> m (Int, HasMore)  -- ^ How many were dropped?
dropWhileKey :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> SerialisedKey -> m (Int, HasMore)
dropWhileKey Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} SerialisedKey
key = do
    ReadCtx m h
cur <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
    if ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
cur SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
== SerialisedKey
key
      then Int -> ReadCtx m h -> m (Int, HasMore)
go Int
0 ReadCtx m h
cur
      else (Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, HasMore
HasMore)  -- nothing to do
  where
    -- invariant: @readCtxHeadKey == key@
    go :: Int -> ReadCtx m h -> m (Int, HasMore)
go !Int
n ReadCtx {ReaderNumber
readCtxNumber :: forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber :: ReaderNumber
readCtxNumber, Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} = do
        Maybe (ReadCtx m h)
mNext <- ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ReaderNumber
readCtxNumber Reader m h
readCtxReader m (Maybe (ReadCtx m h))
-> (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Maybe (ReadCtx m h)
Nothing  -> MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap
          Just ReadCtx m h
ctx -> ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just (ReadCtx m h -> Maybe (ReadCtx m h))
-> m (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutableHeap (PrimState m) (ReadCtx m h)
-> ReadCtx m h -> m (ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> a -> m a
Heap.replaceRoot MutableHeap (PrimState m) (ReadCtx m h)
readersHeap ReadCtx m h
ctx
        let !n' :: Int
n' = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
        case Maybe (ReadCtx m h)
mNext of
          Maybe (ReadCtx m h)
Nothing -> do
            (Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
n', HasMore
Drained)
          Just ReadCtx m h
next -> do
            -- hasMore
            if ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
next SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
== SerialisedKey
key
              then
                Int -> ReadCtx m h -> m (Int, HasMore)
go Int
n' ReadCtx m h
next
              else do
                MutVar (PrimState m) (ReadCtx m h) -> ReadCtx m h -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext ReadCtx m h
next
                (Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
n', HasMore
HasMore)

{-# SPECIALISE dropOne ::
     Readers IO h
  -> ReaderNumber
  -> Reader IO h
  -> IO HasMore #-}
dropOne ::
     (MonadMask m, MonadSTM m, MonadST m)
  => Readers m h
  -> ReaderNumber
  -> Reader m h
  -> m HasMore
dropOne :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> ReaderNumber -> Reader m h -> m HasMore
dropOne Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} ReaderNumber
number Reader m h
reader = do
    Maybe (ReadCtx m h)
mNext <- ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ReaderNumber
number Reader m h
reader m (Maybe (ReadCtx m h))
-> (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe (ReadCtx m h)
Nothing  -> MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap
      Just ReadCtx m h
ctx -> ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just (ReadCtx m h -> Maybe (ReadCtx m h))
-> m (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutableHeap (PrimState m) (ReadCtx m h)
-> ReadCtx m h -> m (ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> a -> m a
Heap.replaceRoot MutableHeap (PrimState m) (ReadCtx m h)
readersHeap ReadCtx m h
ctx
    case Maybe (ReadCtx m h)
mNext of
      Maybe (ReadCtx m h)
Nothing ->
        HasMore -> m HasMore
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HasMore
Drained
      Just ReadCtx m h
next -> do
        MutVar (PrimState m) (ReadCtx m h) -> ReadCtx m h -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext ReadCtx m h
next
        HasMore -> m HasMore
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HasMore
HasMore

{-# SPECIALISE nextReadCtx ::
     ReaderNumber
  -> Reader IO h
  -> IO (Maybe (ReadCtx IO h)) #-}
nextReadCtx ::
     (MonadMask m, MonadSTM m, MonadST m)
  => ReaderNumber
  -> Reader m h
  -> m (Maybe (ReadCtx m h))
nextReadCtx :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ReaderNumber
readCtxNumber Reader m h
readCtxReader =
    case Reader m h
readCtxReader of
      ReadRun RunReader m h
r -> RunReader m h -> m (Result m h)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
RunReader m h -> m (Result m h)
Reader.next RunReader m h
r m (Result m h)
-> (Result m h -> Maybe (ReadCtx m h)) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
        Result m h
Reader.Empty ->
          Maybe (ReadCtx m h)
forall a. Maybe a
Nothing
        Reader.ReadEntry SerialisedKey
readCtxHeadKey Entry m h
readCtxHeadEntry ->
          ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
..}
      ReadBuffer MutVar (PrimState m) [KOp m h]
r -> MutVar (PrimState m) [KOp m h]
-> ([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar MutVar (PrimState m) [KOp m h]
r (([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
 -> m (Maybe (ReadCtx m h)))
-> ([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. (a -> b) -> a -> b
$ \case
        [] ->
          ([], Maybe (ReadCtx m h)
forall a. Maybe a
Nothing)
        ((SerialisedKey
readCtxHeadKey, Entry SerialisedValue (RawBlobRef m h)
e) : [KOp m h]
rest) ->
          let readCtxHeadEntry :: Entry m h
readCtxHeadEntry = Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
Reader.Entry Entry SerialisedValue (RawBlobRef m h)
e
          in  ([KOp m h]
rest, ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
..})