{-# OPTIONS_HADDOCK not-home #-}

-- | Multiple inputs (write buffers, runs) that are being read incrementally.
module Database.LSMTree.Internal.Readers (
    Readers (..)
  , OffsetKey (..)
  , ReaderSource (..)
  , ReadersMergeType (..)
  , new
  , close
  , peekKey
  , HasMore (..)
  , pop
  , dropWhileKey
    -- * Internals
  , Reader (..)
  , ReaderNumber (..)
  , ReadCtx (..)
  ) where

import           Control.Monad (zipWithM)
import           Control.Monad.Class.MonadST (MonadST)
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow (MonadMask)
import           Control.Monad.Primitive
import           Control.RefCount
import           Data.Function (on)
import           Data.Functor ((<&>))
import           Data.List.NonEmpty (nonEmpty)
import qualified Data.Map.Strict as Map
import           Data.Maybe (catMaybes)
import           Data.Primitive.MutVar
import           Data.Traversable (for)
import           Database.LSMTree.Internal.BlobRef (BlobSpan, RawBlobRef)
import           Database.LSMTree.Internal.Entry (Entry (..))
import qualified Database.LSMTree.Internal.Entry as Entry
import           Database.LSMTree.Internal.Index.CompactAcc (SMaybe (..),
                     smaybe)
import           Database.LSMTree.Internal.Run (Run)
import           Database.LSMTree.Internal.RunReader (OffsetKey (..),
                     RunReader (..))
import qualified Database.LSMTree.Internal.RunReader as RunReader
import           Database.LSMTree.Internal.Serialise
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WB
import qualified KMerge.Heap as Heap
import qualified System.FS.API as FS

-- | A collection of runs and write buffers being read from, yielding elements
-- in order. More precisely, that means first ordered by their key, then by the
-- input run they came from. This is important for resolving multiple entries
-- with the same key into one.
--
-- Construct with 'new', then keep calling 'pop'.
-- If aborting early, remember to call 'close'!
--
-- Creating a 'Readers' does not retain a reference to the input 'Run's or the
-- 'WriteBufferBlobs', but does retain an independent reference on their blob
-- files. It is not necessary to separately retain the 'Run's or the
-- 'WriteBufferBlobs' for correct use of the 'Readers'. There is one important
-- caveat however: to preserve the validity of 'BlobRef's then it is necessary
-- to separately retain a reference to the 'Run' or its 'BlobFile' to preserve
-- the validity of 'BlobRefs'.
--
-- TODO: do this more nicely by changing 'Reader' to preserve the 'BlobFile'
-- ref until it is explicitly closed, and also retain the 'BlobFile' from the
-- WBB and release all of these 'BlobFiles' once the 'Readers' is itself closed.
--
data Readers m h = Readers {
      forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersHeap :: !(Heap.MutableHeap (PrimState m) (ReadCtx m h))
      -- | Since there is always one reader outside of the heap, we need to
      -- store it separately. This also contains the next k\/op to yield, unless
      -- all readers are drained, i.e. both:
      -- 1. the reader inside the 'ReadCtx' is empty
      -- 2. the heap is empty
    , forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersNext :: !(MutVar (PrimState m) (ReadCtx m h))
    }

newtype ReaderNumber = ReaderNumber Int
  deriving stock (ReaderNumber -> ReaderNumber -> Bool
(ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool) -> Eq ReaderNumber
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReaderNumber -> ReaderNumber -> Bool
== :: ReaderNumber -> ReaderNumber -> Bool
$c/= :: ReaderNumber -> ReaderNumber -> Bool
/= :: ReaderNumber -> ReaderNumber -> Bool
Eq, Eq ReaderNumber
Eq ReaderNumber =>
(ReaderNumber -> ReaderNumber -> Ordering)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> Bool)
-> (ReaderNumber -> ReaderNumber -> ReaderNumber)
-> (ReaderNumber -> ReaderNumber -> ReaderNumber)
-> Ord ReaderNumber
ReaderNumber -> ReaderNumber -> Bool
ReaderNumber -> ReaderNumber -> Ordering
ReaderNumber -> ReaderNumber -> ReaderNumber
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ReaderNumber -> ReaderNumber -> Ordering
compare :: ReaderNumber -> ReaderNumber -> Ordering
$c< :: ReaderNumber -> ReaderNumber -> Bool
< :: ReaderNumber -> ReaderNumber -> Bool
$c<= :: ReaderNumber -> ReaderNumber -> Bool
<= :: ReaderNumber -> ReaderNumber -> Bool
$c> :: ReaderNumber -> ReaderNumber -> Bool
> :: ReaderNumber -> ReaderNumber -> Bool
$c>= :: ReaderNumber -> ReaderNumber -> Bool
>= :: ReaderNumber -> ReaderNumber -> Bool
$cmax :: ReaderNumber -> ReaderNumber -> ReaderNumber
max :: ReaderNumber -> ReaderNumber -> ReaderNumber
$cmin :: ReaderNumber -> ReaderNumber -> ReaderNumber
min :: ReaderNumber -> ReaderNumber -> ReaderNumber
Ord)

-- | Each heap element needs some more context than just the reader.
-- E.g. the 'Eq' instance we need to be able to access the first key to be read
-- in a pure way.
--
-- TODO(optimisation): We allocate this record for each k/op. This might be
-- avoidable, see ideas below.
data ReadCtx m h = ReadCtx {
      -- We could avoid this using a more specialised mutable heap with separate
      -- arrays for keys and values (or even each of their components).
      -- Using an 'STRef' could avoid reallocating the record for every entry,
      -- but that might not be straightforward to integrate with the heap.
      forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey   :: !SerialisedKey
    , forall (m :: * -> *) h. ReadCtx m h -> Entry m h
readCtxHeadEntry :: !(RunReader.Entry m h)
      -- We could get rid of this by making 'LoserTree' stable (for which there
      -- is a prototype already).
      -- Alternatively, if we decide to have an invariant that the number in
      -- 'RunFsPaths' is always higher for newer runs, then we could use that
      -- in the 'Ord' instance.
    , forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber    :: !ReaderNumber
    , forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader    :: !(Reader m h)
    }

instance Eq (ReadCtx m h) where
  == :: ReadCtx m h -> ReadCtx m h -> Bool
(==) = (SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Bool
forall a. Eq a => a -> a -> Bool
(==) ((SerialisedKey, ReaderNumber)
 -> (SerialisedKey, ReaderNumber) -> Bool)
-> (ReadCtx m h -> (SerialisedKey, ReaderNumber))
-> ReadCtx m h
-> ReadCtx m h
-> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (\ReadCtx m h
r -> (ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
r, ReadCtx m h -> ReaderNumber
forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber ReadCtx m h
r))

-- | Makes sure we resolve entries in the right order.
instance Ord (ReadCtx m h) where
  compare :: ReadCtx m h -> ReadCtx m h -> Ordering
compare = (SerialisedKey, ReaderNumber)
-> (SerialisedKey, ReaderNumber) -> Ordering
forall a. Ord a => a -> a -> Ordering
compare ((SerialisedKey, ReaderNumber)
 -> (SerialisedKey, ReaderNumber) -> Ordering)
-> (ReadCtx m h -> (SerialisedKey, ReaderNumber))
-> ReadCtx m h
-> ReadCtx m h
-> Ordering
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (\ReadCtx m h
r -> (ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
r, ReadCtx m h -> ReaderNumber
forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber ReadCtx m h
r))

-- | An individual reader must be able to produce a sequence of pairs of
-- 'SerialisedKey' and 'RunReader.Entry', with ordered und unique keys.
--
-- TODO: This is slightly inelegant. This module could work generally for
-- anything that can produce elements, but currently is very specific to having
-- write buffer and run readers. Also, for run merging, no write buffer is
-- involved, but we still need to branch on this sum type.
-- A more general version is possible, but despite SPECIALISE-ing everything
-- showed ~100 bytes of extra allocations per entry that is read (which might be
-- avoidable with some tinkering).
data Reader m h =
    -- | The list allows to incrementally read from the write buffer without
    -- having to find the next entry in the Map again (requiring key
    -- comparisons) or having to copy out all entries.
    --
    -- TODO: more efficient representation? benchmark!
    ReadBuffer  !(MutVar (PrimState m) [KOp m h])
  | ReadRun     !(RunReader m h)
    -- | Recursively read from another reader. This requires keeping track of
    -- its 'HasMore' status, since we should not try to read another entry from
    -- it once it is drained.
    --
    -- We represent the recursive reader and 'HasMore' status together as a
    -- 'Maybe' 'Readers'. The reason is subtle: once a 'Readers' becomes drained
    -- it is immediately closed, after which the structure should not be used
    -- anymore or you'd be using resources after they have been closed already.
    --
    -- TODO: maybe it's a slightly more ergonomic alternative to no close the
    -- 'Readers' automatically.
  | ReadReaders !ReadersMergeType !(SMaybe (Readers m h))

type KOp m h = (SerialisedKey, Entry SerialisedValue (RawBlobRef m h))

data ReaderSource m h =
    FromWriteBuffer !WB.WriteBuffer !(Ref (WB.WriteBufferBlobs m h))
  | FromRun         !(Ref (Run m h))
    -- | Recursive case, allowing to build a tree of readers for a merging tree.
  | FromReaders     !ReadersMergeType ![ReaderSource m h]

{-# SPECIALISE new ::
     ResolveSerialisedValue
  -> OffsetKey
  -> [ReaderSource IO h]
  -> IO (Maybe (Readers IO h)) #-}
new :: forall m h.
     (MonadMask m, MonadST m, MonadSTM m)
  => ResolveSerialisedValue
  -> OffsetKey
  -> [ReaderSource m h]
  -> m (Maybe (Readers m h))
new :: forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> OffsetKey -> [ReaderSource m h] -> m (Maybe (Readers m h))
new ResolveSerialisedValue
resolve !OffsetKey
offsetKey [ReaderSource m h]
sources = do
    [Maybe (ReadCtx m h)]
readers <- (Int -> ReaderSource m h -> m (Maybe (ReadCtx m h)))
-> [Int] -> [ReaderSource m h] -> m [Maybe (ReadCtx m h)]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM (ReaderNumber -> ReaderSource m h -> m (Maybe (ReadCtx m h))
fromSource (ReaderNumber -> ReaderSource m h -> m (Maybe (ReadCtx m h)))
-> (Int -> ReaderNumber)
-> Int
-> ReaderSource m h
-> m (Maybe (ReadCtx m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ReaderNumber
ReaderNumber) [Int
1..] [ReaderSource m h]
sources
    Maybe (NonEmpty (ReadCtx m h))
-> (NonEmpty (ReadCtx m h) -> m (Readers m h))
-> m (Maybe (Readers m h))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for ([ReadCtx m h] -> Maybe (NonEmpty (ReadCtx m h))
forall a. [a] -> Maybe (NonEmpty a)
nonEmpty ([Maybe (ReadCtx m h)] -> [ReadCtx m h]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (ReadCtx m h)]
readers)) ((NonEmpty (ReadCtx m h) -> m (Readers m h))
 -> m (Maybe (Readers m h)))
-> (NonEmpty (ReadCtx m h) -> m (Readers m h))
-> m (Maybe (Readers m h))
forall a b. (a -> b) -> a -> b
$ \NonEmpty (ReadCtx m h)
xs -> do
      (MutableHeap (PrimState m) (ReadCtx m h)
readersHeap, ReadCtx m h
readCtx) <- NonEmpty (ReadCtx m h)
-> m (MutableHeap (PrimState m) (ReadCtx m h), ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
NonEmpty a -> m (MutableHeap (PrimState m) a, a)
Heap.newMutableHeap NonEmpty (ReadCtx m h)
xs
      MutVar (PrimState m) (ReadCtx m h)
readersNext <- ReadCtx m h -> m (MutVar (PrimState m) (ReadCtx m h))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar ReadCtx m h
readCtx
      Readers m h -> m (Readers m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..}
  where
    fromSource :: ReaderNumber -> ReaderSource m h -> m (Maybe (ReadCtx m h))
    fromSource :: ReaderNumber -> ReaderSource m h -> m (Maybe (ReadCtx m h))
fromSource ReaderNumber
n ReaderSource m h
src =
        case ReaderSource m h
src of
          FromWriteBuffer WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbblobs -> do
            Reader m h
rs <- WriteBuffer -> Ref (WriteBufferBlobs m h) -> m (Reader m h)
fromWB WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbblobs
            ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
n Reader m h
rs
          FromRun Ref (Run m h)
r -> do
            Reader m h
rs <- RunReader m h -> Reader m h
forall (m :: * -> *) h. RunReader m h -> Reader m h
ReadRun (RunReader m h -> Reader m h)
-> m (RunReader m h) -> m (Reader m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> OffsetKey -> Ref (Run m h) -> m (RunReader m h)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
OffsetKey -> Ref (Run m h) -> m (RunReader m h)
RunReader.new OffsetKey
offsetKey Ref (Run m h)
r
            ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
n Reader m h
rs
          FromReaders ReadersMergeType
mergeType [ReaderSource m h]
nestedSources -> do
            ResolveSerialisedValue
-> OffsetKey -> [ReaderSource m h] -> m (Maybe (Readers m h))
forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> OffsetKey -> [ReaderSource m h] -> m (Maybe (Readers m h))
new ResolveSerialisedValue
resolve OffsetKey
offsetKey [ReaderSource m h]
nestedSources m (Maybe (Readers m h))
-> (Maybe (Readers m h) -> m (Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Maybe (Readers m h)
Nothing -> Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ReadCtx m h)
forall a. Maybe a
Nothing
              Just Readers m h
rs -> ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
n (ReadersMergeType -> SMaybe (Readers m h) -> Reader m h
forall (m :: * -> *) h.
ReadersMergeType -> SMaybe (Readers m h) -> Reader m h
ReadReaders ReadersMergeType
mergeType (Readers m h -> SMaybe (Readers m h)
forall a. a -> SMaybe a
SJust Readers m h
rs))

    fromWB :: WB.WriteBuffer -> Ref (WB.WriteBufferBlobs m h) -> m (Reader m h)
    fromWB :: WriteBuffer -> Ref (WriteBufferBlobs m h) -> m (Reader m h)
fromWB WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbblobs = do
        let kops :: [(SerialisedKey, Entry SerialisedValue BlobSpan)]
kops = Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
forall k a. Map k a -> [(k, a)]
Map.toList (Map SerialisedKey (Entry SerialisedValue BlobSpan)
 -> [(SerialisedKey, Entry SerialisedValue BlobSpan)])
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
forall a b. (a -> b) -> a -> b
$ Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
filterWB (Map SerialisedKey (Entry SerialisedValue BlobSpan)
 -> Map SerialisedKey (Entry SerialisedValue BlobSpan))
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall a b. (a -> b) -> a -> b
$ WriteBuffer -> Map SerialisedKey (Entry SerialisedValue BlobSpan)
WB.toMap WriteBuffer
wb
        MutVar (PrimState m) [KOp m h] -> Reader m h
forall (m :: * -> *) h.
MutVar (PrimState m) [KOp m h] -> Reader m h
ReadBuffer (MutVar (PrimState m) [KOp m h] -> Reader m h)
-> m (MutVar (PrimState m) [KOp m h]) -> m (Reader m h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [KOp m h] -> m (MutVar (PrimState m) [KOp m h])
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar (((SerialisedKey, Entry SerialisedValue BlobSpan) -> KOp m h)
-> [(SerialisedKey, Entry SerialisedValue BlobSpan)] -> [KOp m h]
forall a b. (a -> b) -> [a] -> [b]
map (SerialisedKey, Entry SerialisedValue BlobSpan) -> KOp m h
forall k v. (k, Entry v BlobSpan) -> (k, Entry v (RawBlobRef m h))
convertBlobs [(SerialisedKey, Entry SerialisedValue BlobSpan)]
kops)
      where
        -- TODO: this conversion involves quite a lot of allocation
        convertBlobs :: (k, Entry v BlobSpan) -> (k, Entry v (RawBlobRef m h))
        convertBlobs :: forall k v. (k, Entry v BlobSpan) -> (k, Entry v (RawBlobRef m h))
convertBlobs = (Entry v BlobSpan -> Entry v (RawBlobRef m h))
-> (k, Entry v BlobSpan) -> (k, Entry v (RawBlobRef m h))
forall a b. (a -> b) -> (k, a) -> (k, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((BlobSpan -> RawBlobRef m h)
-> Entry v BlobSpan -> Entry v (RawBlobRef m h)
forall a b. (a -> b) -> Entry v a -> Entry v b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
WB.mkRawBlobRef Ref (WriteBufferBlobs m h)
wbblobs))

        filterWB :: Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
filterWB = case OffsetKey
offsetKey of
            OffsetKey
NoOffsetKey -> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall a. a -> a
id
            OffsetKey SerialisedKey
k -> (SerialisedKey -> Bool)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
-> Map SerialisedKey (Entry SerialisedValue BlobSpan)
forall k a. (k -> Bool) -> Map k a -> Map k a
Map.dropWhileAntitone (SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
< SerialisedKey
k)

{-# SPECIALISE close :: Readers IO (FS.Handle h) -> IO () #-}
-- | Clean up the resources held by the readers.
--
-- Only call this function when aborting before all readers have been drained!
close ::
     (MonadMask m, MonadSTM m, PrimMonad m)
  => Readers m h
  -> m ()
close :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
close Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
    ReadCtx {Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
    Reader m h -> m ()
forall {h}. Reader m h -> m ()
closeReader Reader m h
readCtxReader
    m ()
closeHeap
  where
    closeReader :: Reader m h -> m ()
closeReader = \case
        ReadBuffer MutVar (PrimState m) [KOp m h]
_             -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        ReadRun RunReader m h
r                -> RunReader m h -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
RunReader m h -> m ()
RunReader.close RunReader m h
r
        ReadReaders ReadersMergeType
_ SMaybe (Readers m h)
readersMay -> m () -> (Readers m h -> m ()) -> SMaybe (Readers m h) -> m ()
forall b a. b -> (a -> b) -> SMaybe a -> b
smaybe (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) Readers m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
close SMaybe (Readers m h)
readersMay
    closeHeap :: m ()
closeHeap =
        MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap m (Maybe (ReadCtx m h)) -> (Maybe (ReadCtx m h) -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Maybe (ReadCtx m h)
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          Just ReadCtx {Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} -> do
            Reader m h -> m ()
forall {h}. Reader m h -> m ()
closeReader Reader m h
readCtxReader
            m ()
closeHeap

{-# SPECIALISE peekKey :: Readers IO h -> IO SerialisedKey #-}
-- | Return the smallest key present in the readers, without consuming any
-- entries.
peekKey ::
     PrimMonad m
  => Readers m h
  -> m SerialisedKey
peekKey :: forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
peekKey Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
    ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey (ReadCtx m h -> SerialisedKey)
-> m (ReadCtx m h) -> m SerialisedKey
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext

-- | Once a function returned 'Drained', do not use the 'Readers' any more!
data HasMore = HasMore | Drained
  deriving stock (HasMore -> HasMore -> Bool
(HasMore -> HasMore -> Bool)
-> (HasMore -> HasMore -> Bool) -> Eq HasMore
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: HasMore -> HasMore -> Bool
== :: HasMore -> HasMore -> Bool
$c/= :: HasMore -> HasMore -> Bool
/= :: HasMore -> HasMore -> Bool
Eq, Int -> HasMore -> ShowS
[HasMore] -> ShowS
HasMore -> String
(Int -> HasMore -> ShowS)
-> (HasMore -> String) -> ([HasMore] -> ShowS) -> Show HasMore
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> HasMore -> ShowS
showsPrec :: Int -> HasMore -> ShowS
$cshow :: HasMore -> String
show :: HasMore -> String
$cshowList :: [HasMore] -> ShowS
showList :: [HasMore] -> ShowS
Show)

{-# SPECIALISE pop ::
     ResolveSerialisedValue
  -> Readers IO h
  -> IO (SerialisedKey, RunReader.Entry IO h, HasMore) #-}
-- | Remove the entry with the smallest key and return it. If there are multiple
-- entries with that key, it removes the one from the source that came first
-- in list supplied to 'new'. No resolution of multiple entries takes place.
pop ::
     (MonadMask m, MonadSTM m, MonadST m)
  => ResolveSerialisedValue
  -> Readers m h
  -> m (SerialisedKey, RunReader.Entry m h, HasMore)
pop :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
pop ResolveSerialisedValue
resolve r :: Readers m h
r@Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} = do
    ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadEntry :: forall (m :: * -> *) h. ReadCtx m h -> Entry m h
readCtxNumber :: forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
..} <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
    HasMore
hasMore <- ResolveSerialisedValue
-> Readers m h -> ReaderNumber -> Reader m h -> m HasMore
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> ReaderNumber -> Reader m h -> m HasMore
dropOne ResolveSerialisedValue
resolve Readers m h
r ReaderNumber
readCtxNumber Reader m h
readCtxReader
    (SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
readCtxHeadKey, Entry m h
readCtxHeadEntry, HasMore
hasMore)

-- TODO: avoid duplication with Merge.TreeMergeType?
data ReadersMergeType = MergeLevel | MergeUnion
  deriving stock (ReadersMergeType -> ReadersMergeType -> Bool
(ReadersMergeType -> ReadersMergeType -> Bool)
-> (ReadersMergeType -> ReadersMergeType -> Bool)
-> Eq ReadersMergeType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReadersMergeType -> ReadersMergeType -> Bool
== :: ReadersMergeType -> ReadersMergeType -> Bool
$c/= :: ReadersMergeType -> ReadersMergeType -> Bool
/= :: ReadersMergeType -> ReadersMergeType -> Bool
Eq, Int -> ReadersMergeType -> ShowS
[ReadersMergeType] -> ShowS
ReadersMergeType -> String
(Int -> ReadersMergeType -> ShowS)
-> (ReadersMergeType -> String)
-> ([ReadersMergeType] -> ShowS)
-> Show ReadersMergeType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReadersMergeType -> ShowS
showsPrec :: Int -> ReadersMergeType -> ShowS
$cshow :: ReadersMergeType -> String
show :: ReadersMergeType -> String
$cshowList :: [ReadersMergeType] -> ShowS
showList :: [ReadersMergeType] -> ShowS
Show)

{-# SPECIALISE popResolved ::
     ResolveSerialisedValue
  -> ReadersMergeType
  -> Readers IO h
  -> IO (SerialisedKey, RunReader.Entry IO h, HasMore) #-}
-- | Produces an entry with the smallest key, resolving all input entries if
-- there are multiple. Therefore, the next call to 'peekKey' will return a
-- larger key than the one returned here.
--
-- General notes on the code below:
-- * It is quite similar to the one in Internal.Cursor and Internal.Merge. Maybe
--   we can avoid some duplication.
-- * Any function that doesn't take a 'hasMore' argument assumes that the
--   readers have not been drained yet, so we must check before calling them.
-- * There is probably opportunity for optimisations.
--
-- TODO: use this function in Internal.Cursor? Measure performance impact.
popResolved ::
     forall h m.
     (MonadMask m, MonadST m, MonadSTM m)
  => ResolveSerialisedValue
  -> ReadersMergeType
  -> Readers m h
  -> m (SerialisedKey, RunReader.Entry m h, HasMore)
popResolved :: forall h (m :: * -> *).
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> ReadersMergeType
-> Readers m h
-> m (SerialisedKey, Entry m h, HasMore)
popResolved ResolveSerialisedValue
resolve ReadersMergeType
mergeType Readers m h
readers = m (SerialisedKey, Entry m h, HasMore)
readEntry
  where
    readEntry :: m (SerialisedKey, RunReader.Entry m h, HasMore)
    readEntry :: m (SerialisedKey, Entry m h, HasMore)
readEntry = do
        (SerialisedKey
key, Entry m h
entry, HasMore
hasMore) <- ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
pop ResolveSerialisedValue
resolve Readers m h
readers
        case HasMore
hasMore of
          HasMore
Drained -> do
            (SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry m h
entry, HasMore
Drained)
          HasMore
HasMore -> do
            case ReadersMergeType
mergeType of
              ReadersMergeType
MergeLevel -> SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleLevel SerialisedKey
key (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
RunReader.toFullEntry Entry m h
entry)
              ReadersMergeType
MergeUnion -> SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleUnion SerialisedKey
key (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
RunReader.toFullEntry Entry m h
entry)

    handleUnion :: SerialisedKey
                -> Entry SerialisedValue (RawBlobRef m h)
                -> m (SerialisedKey, RunReader.Entry m h, HasMore)
    handleUnion :: SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleUnion SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entry = do
        SerialisedKey
nextKey <- Readers m h -> m SerialisedKey
forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
peekKey Readers m h
readers
        if SerialisedKey
nextKey SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
/= SerialisedKey
key
          then
            -- No more entries for same key, done.
            (SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry Entry SerialisedValue (RawBlobRef m h)
entry, HasMore
HasMore)
          else do
            (SerialisedKey
_, Entry m h
nextEntry, HasMore
hasMore) <- ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
pop ResolveSerialisedValue
resolve Readers m h
readers
            let resolved :: Entry SerialisedValue (RawBlobRef m h)
resolved = ResolveSerialisedValue
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
forall v b. (v -> v -> v) -> Entry v b -> Entry v b -> Entry v b
Entry.combineUnion ResolveSerialisedValue
resolve Entry SerialisedValue (RawBlobRef m h)
entry
                             (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
RunReader.toFullEntry Entry m h
nextEntry)
            case HasMore
hasMore of
              HasMore
HasMore -> SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleUnion SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
resolved
              HasMore
Drained -> (SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry Entry SerialisedValue (RawBlobRef m h)
resolved, HasMore
Drained)

    handleLevel :: SerialisedKey
                -> Entry SerialisedValue (RawBlobRef m h)
                -> m (SerialisedKey, RunReader.Entry m h, HasMore)
    handleLevel :: SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleLevel SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entry =
        case Entry SerialisedValue (RawBlobRef m h)
entry of
          Upsert SerialisedValue
v ->
            SerialisedKey
-> SerialisedValue -> m (SerialisedKey, Entry m h, HasMore)
handleMupdate SerialisedKey
key SerialisedValue
v
          Entry SerialisedValue (RawBlobRef m h)
_ -> do
            -- Anything but Upsert supersedes all previous entries of
            -- the same key, so we can simply drop them and are done.
            HasMore
hasMore' <- SerialisedKey -> m HasMore
dropRemaining SerialisedKey
key
            (SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry Entry SerialisedValue (RawBlobRef m h)
entry, HasMore
hasMore')

    -- Resolve a 'Mupsert' value with the other entries of the same key.
    handleMupdate :: SerialisedKey
                  -> SerialisedValue
                  -> m (SerialisedKey, RunReader.Entry m h, HasMore)
    handleMupdate :: SerialisedKey
-> SerialisedValue -> m (SerialisedKey, Entry m h, HasMore)
handleMupdate SerialisedKey
key SerialisedValue
v = do
        SerialisedKey
nextKey <- Readers m h -> m SerialisedKey
forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
peekKey Readers m h
readers
        if SerialisedKey
nextKey SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
/= SerialisedKey
key
          then
            -- No more entries for same key, done.
            (SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry (SerialisedValue -> Entry SerialisedValue (RawBlobRef m h)
forall v b. v -> Entry v b
Upsert SerialisedValue
v), HasMore
HasMore)
          else do
            (SerialisedKey
_, Entry m h
nextEntry, HasMore
hasMore) <- ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> m (SerialisedKey, Entry m h, HasMore)
pop ResolveSerialisedValue
resolve Readers m h
readers
            let resolved :: Entry SerialisedValue (RawBlobRef m h)
resolved = ResolveSerialisedValue
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
forall v b. (v -> v -> v) -> Entry v b -> Entry v b -> Entry v b
Entry.combine ResolveSerialisedValue
resolve (SerialisedValue -> Entry SerialisedValue (RawBlobRef m h)
forall v b. v -> Entry v b
Upsert SerialisedValue
v)
                             (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
RunReader.toFullEntry Entry m h
nextEntry)
            case HasMore
hasMore of
              HasMore
HasMore -> SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> m (SerialisedKey, Entry m h, HasMore)
handleLevel SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
resolved
              HasMore
Drained -> (SerialisedKey, Entry m h, HasMore)
-> m (SerialisedKey, Entry m h, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SerialisedKey
key, Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry Entry SerialisedValue (RawBlobRef m h)
resolved, HasMore
Drained)

    dropRemaining :: SerialisedKey -> m HasMore
    dropRemaining :: SerialisedKey -> m HasMore
dropRemaining SerialisedKey
key = do
        (Int
_, HasMore
hasMore) <- ResolveSerialisedValue
-> Readers m h -> SerialisedKey -> m (Int, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> SerialisedKey -> m (Int, HasMore)
dropWhileKey ResolveSerialisedValue
resolve Readers m h
readers SerialisedKey
key
        HasMore -> m HasMore
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HasMore
hasMore

{-# SPECIALISE dropWhileKey ::
     ResolveSerialisedValue
  -> Readers IO h
  -> SerialisedKey
  -> IO (Int, HasMore) #-}
-- | Drop all entries with a key that is smaller or equal to the supplied one.
dropWhileKey ::
     (MonadMask m, MonadSTM m, MonadST m)
  => ResolveSerialisedValue
  -> Readers m h
  -> SerialisedKey
  -> m (Int, HasMore)  -- ^ How many were dropped?
dropWhileKey :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> SerialisedKey -> m (Int, HasMore)
dropWhileKey ResolveSerialisedValue
resolve Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} SerialisedKey
key = do
    ReadCtx m h
cur <- MutVar (PrimState m) (ReadCtx m h) -> m (ReadCtx m h)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext
    if ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
cur SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
<= SerialisedKey
key
      then Int -> ReadCtx m h -> m (Int, HasMore)
go Int
0 ReadCtx m h
cur
      else (Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
0, HasMore
HasMore)  -- nothing to do
  where
    -- invariant: @readCtxHeadKey <= key@
    go :: Int -> ReadCtx m h -> m (Int, HasMore)
go !Int
n ReadCtx {ReaderNumber
readCtxNumber :: forall (m :: * -> *) h. ReadCtx m h -> ReaderNumber
readCtxNumber :: ReaderNumber
readCtxNumber, Reader m h
readCtxReader :: forall (m :: * -> *) h. ReadCtx m h -> Reader m h
readCtxReader :: Reader m h
readCtxReader} = do
        Maybe (ReadCtx m h)
mNext <- ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
readCtxNumber Reader m h
readCtxReader m (Maybe (ReadCtx m h))
-> (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Maybe (ReadCtx m h)
Nothing  -> MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap
          Just ReadCtx m h
ctx -> ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just (ReadCtx m h -> Maybe (ReadCtx m h))
-> m (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutableHeap (PrimState m) (ReadCtx m h)
-> ReadCtx m h -> m (ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> a -> m a
Heap.replaceRoot MutableHeap (PrimState m) (ReadCtx m h)
readersHeap ReadCtx m h
ctx
        let !n' :: Int
n' = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
        case Maybe (ReadCtx m h)
mNext of
          Maybe (ReadCtx m h)
Nothing -> do
            (Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
n', HasMore
Drained)
          Just ReadCtx m h
next -> do
            -- hasMore
            if ReadCtx m h -> SerialisedKey
forall (m :: * -> *) h. ReadCtx m h -> SerialisedKey
readCtxHeadKey ReadCtx m h
next SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
<= SerialisedKey
key
              then
                Int -> ReadCtx m h -> m (Int, HasMore)
go Int
n' ReadCtx m h
next
              else do
                MutVar (PrimState m) (ReadCtx m h) -> ReadCtx m h -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext ReadCtx m h
next
                (Int, HasMore) -> m (Int, HasMore)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
n', HasMore
HasMore)

{-# SPECIALISE dropOne ::
     ResolveSerialisedValue
  -> Readers IO h
  -> ReaderNumber
  -> Reader IO h
  -> IO HasMore #-}
dropOne ::
     (MonadMask m, MonadSTM m, MonadST m)
  => ResolveSerialisedValue
  -> Readers m h
  -> ReaderNumber
  -> Reader m h
  -> m HasMore
dropOne :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> Readers m h -> ReaderNumber -> Reader m h -> m HasMore
dropOne ResolveSerialisedValue
resolve Readers {MutableHeap (PrimState m) (ReadCtx m h)
MutVar (PrimState m) (ReadCtx m h)
readersHeap :: forall (m :: * -> *) h.
Readers m h -> MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: forall (m :: * -> *) h.
Readers m h -> MutVar (PrimState m) (ReadCtx m h)
readersHeap :: MutableHeap (PrimState m) (ReadCtx m h)
readersNext :: MutVar (PrimState m) (ReadCtx m h)
..} ReaderNumber
number Reader m h
reader = do
    Maybe (ReadCtx m h)
mNext <- ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
number Reader m h
reader m (Maybe (ReadCtx m h))
-> (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe (ReadCtx m h)
Nothing  -> MutableHeap (PrimState m) (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> m (Maybe a)
Heap.extract MutableHeap (PrimState m) (ReadCtx m h)
readersHeap
      Just ReadCtx m h
ctx -> ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just (ReadCtx m h -> Maybe (ReadCtx m h))
-> m (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MutableHeap (PrimState m) (ReadCtx m h)
-> ReadCtx m h -> m (ReadCtx m h)
forall a (m :: * -> *).
(PrimMonad m, Ord a) =>
MutableHeap (PrimState m) a -> a -> m a
Heap.replaceRoot MutableHeap (PrimState m) (ReadCtx m h)
readersHeap ReadCtx m h
ctx
    case Maybe (ReadCtx m h)
mNext of
      Maybe (ReadCtx m h)
Nothing ->
        HasMore -> m HasMore
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HasMore
Drained
      Just ReadCtx m h
next -> do
        MutVar (PrimState m) (ReadCtx m h) -> ReadCtx m h -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (ReadCtx m h)
readersNext ReadCtx m h
next
        HasMore -> m HasMore
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure HasMore
HasMore

{-# SPECIALISE nextReadCtx ::
     ResolveSerialisedValue
  -> ReaderNumber
  -> Reader IO h
  -> IO (Maybe (ReadCtx IO h)) #-}
nextReadCtx ::
     (MonadMask m, MonadSTM m, MonadST m)
  => ResolveSerialisedValue
  -> ReaderNumber
  -> Reader m h
  -> m (Maybe (ReadCtx m h))
nextReadCtx :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> ReaderNumber -> Reader m h -> m (Maybe (ReadCtx m h))
nextReadCtx ResolveSerialisedValue
resolve ReaderNumber
readCtxNumber Reader m h
readCtxReader =
    case Reader m h
readCtxReader of
      ReadBuffer MutVar (PrimState m) [KOp m h]
r -> MutVar (PrimState m) [KOp m h]
-> ([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar MutVar (PrimState m) [KOp m h]
r (([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
 -> m (Maybe (ReadCtx m h)))
-> ([KOp m h] -> ([KOp m h], Maybe (ReadCtx m h)))
-> m (Maybe (ReadCtx m h))
forall a b. (a -> b) -> a -> b
$ \case
        [] ->
          ([], Maybe (ReadCtx m h)
forall a. Maybe a
Nothing)
        ((SerialisedKey
readCtxHeadKey, Entry SerialisedValue (RawBlobRef m h)
e) : [KOp m h]
rest) ->
          let readCtxHeadEntry :: Entry m h
readCtxHeadEntry = Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
RunReader.Entry Entry SerialisedValue (RawBlobRef m h)
e
          in  ([KOp m h]
rest, ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
..})
      ReadRun RunReader m h
r -> RunReader m h -> m (Result m h)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
RunReader m h -> m (Result m h)
RunReader.next RunReader m h
r m (Result m h)
-> (Result m h -> Maybe (ReadCtx m h)) -> m (Maybe (ReadCtx m h))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
        Result m h
RunReader.Empty ->
          Maybe (ReadCtx m h)
forall a. Maybe a
Nothing
        RunReader.ReadEntry SerialisedKey
readCtxHeadKey Entry m h
readCtxHeadEntry ->
          ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just ReadCtx {SerialisedKey
Entry m h
Reader m h
ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxNumber :: ReaderNumber
readCtxReader :: Reader m h
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
..}
      ReadReaders ReadersMergeType
mergeType SMaybe (Readers m h)
readersMay -> case SMaybe (Readers m h)
readersMay of
        SMaybe (Readers m h)
SNothing ->
          Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ReadCtx m h)
forall a. Maybe a
Nothing
        SJust Readers m h
readers -> do
          (SerialisedKey
readCtxHeadKey, Entry m h
readCtxHeadEntry, HasMore
hasMore) <-
            ResolveSerialisedValue
-> ReadersMergeType
-> Readers m h
-> m (SerialisedKey, Entry m h, HasMore)
forall h (m :: * -> *).
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> ReadersMergeType
-> Readers m h
-> m (SerialisedKey, Entry m h, HasMore)
popResolved ResolveSerialisedValue
resolve ReadersMergeType
mergeType Readers m h
readers
          let readersMay' :: SMaybe (Readers m h)
readersMay' = case HasMore
hasMore of
                HasMore
Drained -> SMaybe (Readers m h)
forall a. SMaybe a
SNothing
                HasMore
HasMore -> Readers m h -> SMaybe (Readers m h)
forall a. a -> SMaybe a
SJust Readers m h
readers
          Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h)))
-> Maybe (ReadCtx m h) -> m (Maybe (ReadCtx m h))
forall a b. (a -> b) -> a -> b
$ ReadCtx m h -> Maybe (ReadCtx m h)
forall a. a -> Maybe a
Just ReadCtx {
              -- TODO: reduce allocations?
              readCtxReader :: Reader m h
readCtxReader = ReadersMergeType -> SMaybe (Readers m h) -> Reader m h
forall (m :: * -> *) h.
ReadersMergeType -> SMaybe (Readers m h) -> Reader m h
ReadReaders ReadersMergeType
mergeType SMaybe (Readers m h)
readersMay'
            , SerialisedKey
Entry m h
ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
readCtxNumber :: ReaderNumber
readCtxNumber :: ReaderNumber
readCtxHeadKey :: SerialisedKey
readCtxHeadEntry :: Entry m h
..
          }