{-# LANGUAGE DataKinds #-}
{-# OPTIONS_HADDOCK not-home #-}

module Database.LSMTree.Internal.Cursor (
    readEntriesWhile
  ) where

import           Control.Concurrent.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadST (MonadST (..))
import           Control.Monad.Class.MonadThrow
import qualified Data.Vector as V
import           Database.LSMTree.Internal.BlobRef (RawBlobRef,
                     WeakBlobRef (..))
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
import           Database.LSMTree.Internal.Entry (Entry)
import qualified Database.LSMTree.Internal.Entry as Entry
import           Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import qualified Database.LSMTree.Internal.RunReader as Reader
import qualified Database.LSMTree.Internal.RunReaders as Readers
import           Database.LSMTree.Internal.Serialise (SerialisedKey,
                     SerialisedValue)
import qualified Database.LSMTree.Internal.Vector as V

{-# INLINE readEntriesWhile #-}
{-# SPECIALISE readEntriesWhile :: forall h res.
     ResolveSerialisedValue
  -> (SerialisedKey -> Bool)
  -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
  -> Readers.Readers IO h
  -> Int
  -> IO (V.Vector res, Readers.HasMore) #-}
-- | General notes on the code below:
-- * it is quite similar to the one in Internal.Merge, but different enough
--   that it's probably easier to keep them separate
-- * any function that doesn't take a 'hasMore' argument assumes that the
--   readers have not been drained yet, so we must check before calling them
-- * there is probably opportunity for optimisations
readEntriesWhile :: forall h m res.
     (MonadMask m, MonadST m, MonadSTM m)
  => ResolveSerialisedValue
  -> (SerialisedKey -> Bool)
  -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
  -> Readers.Readers m h
  -> Int
  -> m (V.Vector res, Readers.HasMore)
readEntriesWhile :: forall h (m :: * -> *) res.
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey
    -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> Readers m h
-> Int
-> m (Vector res, HasMore)
readEntriesWhile ResolveSerialisedValue
resolve SerialisedKey -> Bool
keyIsWanted SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry Readers m h
readers Int
n =
    ((HasMore -> m (Maybe res, HasMore))
 -> HasMore -> m (Vector res, HasMore))
-> HasMore
-> (HasMore -> m (Maybe res, HasMore))
-> m (Vector res, HasMore)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Int
-> (HasMore -> m (Maybe res, HasMore))
-> HasMore
-> m (Vector res, HasMore)
forall (m :: * -> *) b a.
PrimMonad m =>
Int -> (b -> m (Maybe a, b)) -> b -> m (Vector a, b)
V.unfoldrNM' Int
n) HasMore
Readers.HasMore ((HasMore -> m (Maybe res, HasMore)) -> m (Vector res, HasMore))
-> (HasMore -> m (Maybe res, HasMore)) -> m (Vector res, HasMore)
forall a b. (a -> b) -> a -> b
$ \case
      HasMore
Readers.Drained -> (Maybe res, HasMore) -> m (Maybe res, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe res
forall a. Maybe a
Nothing, HasMore
Readers.Drained)
      HasMore
Readers.HasMore -> m (Maybe res, HasMore)
readEntryIfWanted
  where
    -- Produces a result unless the readers have been drained or 'keyIsWanted'
    -- returned False.
    readEntryIfWanted :: m (Maybe res, Readers.HasMore)
    readEntryIfWanted :: m (Maybe res, HasMore)
readEntryIfWanted = do
        SerialisedKey
key <- Readers m h -> m SerialisedKey
forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
Readers.peekKey Readers m h
readers
        if SerialisedKey -> Bool
keyIsWanted SerialisedKey
key then m (Maybe res, HasMore)
readEntry
                           else (Maybe res, HasMore) -> m (Maybe res, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe res
forall a. Maybe a
Nothing, HasMore
Readers.HasMore)

    readEntry :: m (Maybe res, Readers.HasMore)
    readEntry :: m (Maybe res, HasMore)
readEntry = do
        (SerialisedKey
key, Entry m h
readerEntry, HasMore
hasMore) <- Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> m (SerialisedKey, Entry m h, HasMore)
Readers.pop Readers m h
readers
        let !entry :: Entry SerialisedValue (RawBlobRef m h)
entry = Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
Reader.toFullEntry Entry m h
readerEntry
        case HasMore
hasMore of
          HasMore
Readers.Drained -> do
            SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> HasMore
-> m (Maybe res, HasMore)
handleResolved SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entry HasMore
Readers.Drained
          HasMore
Readers.HasMore -> do
            case Entry SerialisedValue (RawBlobRef m h)
entry of
              Entry.Mupdate SerialisedValue
v ->
                SerialisedKey -> SerialisedValue -> m (Maybe res, HasMore)
handleMupdate SerialisedKey
key SerialisedValue
v
              Entry SerialisedValue (RawBlobRef m h)
_ -> do
                -- Anything but Mupdate supersedes all previous entries of
                -- the same key, so we can simply drop them and are done.
                HasMore
hasMore' <- SerialisedKey -> m HasMore
dropRemaining SerialisedKey
key
                SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> HasMore
-> m (Maybe res, HasMore)
handleResolved SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entry HasMore
hasMore'

    dropRemaining :: SerialisedKey -> m Readers.HasMore
    dropRemaining :: SerialisedKey -> m HasMore
dropRemaining SerialisedKey
key = do
        (Int
_, HasMore
hasMore) <- Readers m h -> SerialisedKey -> m (Int, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> SerialisedKey -> m (Int, HasMore)
Readers.dropWhileKey Readers m h
readers SerialisedKey
key
        HasMore -> m HasMore
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HasMore
hasMore

    -- Resolve a 'Mupsert' value with the other entries of the same key.
    handleMupdate :: SerialisedKey
                  -> SerialisedValue
                  -> m (Maybe res, Readers.HasMore)
    handleMupdate :: SerialisedKey -> SerialisedValue -> m (Maybe res, HasMore)
handleMupdate SerialisedKey
key SerialisedValue
v = do
        SerialisedKey
nextKey <- Readers m h -> m SerialisedKey
forall (m :: * -> *) h.
PrimMonad m =>
Readers m h -> m SerialisedKey
Readers.peekKey Readers m h
readers
        if SerialisedKey
nextKey SerialisedKey -> SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
/= SerialisedKey
key
          then
            -- No more entries for same key, done.
            SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> HasMore
-> m (Maybe res, HasMore)
handleResolved SerialisedKey
key (SerialisedValue -> Entry SerialisedValue (RawBlobRef m h)
forall v b. v -> Entry v b
Entry.Mupdate SerialisedValue
v) HasMore
Readers.HasMore
          else do
            (SerialisedKey
_, Entry m h
nextEntry, HasMore
hasMore) <- Readers m h -> m (SerialisedKey, Entry m h, HasMore)
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
Readers m h -> m (SerialisedKey, Entry m h, HasMore)
Readers.pop Readers m h
readers
            let resolved :: Entry SerialisedValue (RawBlobRef m h)
resolved = ResolveSerialisedValue
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
forall v b. (v -> v -> v) -> Entry v b -> Entry v b -> Entry v b
Entry.combine ResolveSerialisedValue
resolve (SerialisedValue -> Entry SerialisedValue (RawBlobRef m h)
forall v b. v -> Entry v b
Entry.Mupdate SerialisedValue
v)
                             (Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
Reader.toFullEntry Entry m h
nextEntry)
            case HasMore
hasMore of
              HasMore
Readers.HasMore -> case Entry SerialisedValue (RawBlobRef m h)
resolved of
                Entry.Mupdate SerialisedValue
v' ->
                  -- Still a mupsert, keep resolving!
                  SerialisedKey -> SerialisedValue -> m (Maybe res, HasMore)
handleMupdate SerialisedKey
key SerialisedValue
v'
                Entry SerialisedValue (RawBlobRef m h)
_ -> do
                  -- Done with this key, remaining entries are obsolete.
                  HasMore
hasMore' <- SerialisedKey -> m HasMore
dropRemaining SerialisedKey
key
                  SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> HasMore
-> m (Maybe res, HasMore)
handleResolved SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
resolved HasMore
hasMore'
              HasMore
Readers.Drained -> do
                SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> HasMore
-> m (Maybe res, HasMore)
handleResolved SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
resolved HasMore
Readers.Drained

    -- Once we have a resolved entry, we still have to make sure it's not
    -- a 'Delete', since we only want to write values to the result vector.
    handleResolved :: SerialisedKey
                   -> Entry SerialisedValue (RawBlobRef m h)
                   -> Readers.HasMore
                   -> m (Maybe res, Readers.HasMore)
    handleResolved :: SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h)
-> HasMore
-> m (Maybe res, HasMore)
handleResolved SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entry HasMore
hasMore =
        case SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h) -> Maybe res
toResult SerialisedKey
key Entry SerialisedValue (RawBlobRef m h)
entry of
          Just !res
res ->
            -- Found one resolved value, done.
            (Maybe res, HasMore) -> m (Maybe res, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (res -> Maybe res
forall a. a -> Maybe a
Just res
res, HasMore
hasMore)
          Maybe res
Nothing ->
            -- Resolved value was a Delete, which we don't want to include.
            -- So look for another one (unless there are no more entries!).
            case HasMore
hasMore of
              HasMore
Readers.HasMore -> m (Maybe res, HasMore)
readEntryIfWanted
              HasMore
Readers.Drained -> (Maybe res, HasMore) -> m (Maybe res, HasMore)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe res
forall a. Maybe a
Nothing, HasMore
Readers.Drained)

    toResult :: SerialisedKey
             -> Entry SerialisedValue (RawBlobRef m h)
             -> Maybe res
    toResult :: SerialisedKey
-> Entry SerialisedValue (RawBlobRef m h) -> Maybe res
toResult SerialisedKey
key = \case
        Entry.Insert SerialisedValue
v -> res -> Maybe res
forall a. a -> Maybe a
Just (res -> Maybe res) -> res -> Maybe res
forall a b. (a -> b) -> a -> b
$ SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry SerialisedKey
key SerialisedValue
v Maybe (WeakBlobRef m h)
forall a. Maybe a
Nothing
        Entry.InsertWithBlob SerialisedValue
v RawBlobRef m h
b -> res -> Maybe res
forall a. a -> Maybe a
Just (res -> Maybe res) -> res -> Maybe res
forall a b. (a -> b) -> a -> b
$ SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry SerialisedKey
key SerialisedValue
v (WeakBlobRef m h -> Maybe (WeakBlobRef m h)
forall a. a -> Maybe a
Just (RawBlobRef m h -> WeakBlobRef m h
forall (m :: * -> *) h. RawBlobRef m h -> WeakBlobRef m h
BlobRef.rawToWeakBlobRef RawBlobRef m h
b))
        Entry.Mupdate SerialisedValue
v -> res -> Maybe res
forall a. a -> Maybe a
Just (res -> Maybe res) -> res -> Maybe res
forall a b. (a -> b) -> a -> b
$ SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry SerialisedKey
key SerialisedValue
v Maybe (WeakBlobRef m h)
forall a. Maybe a
Nothing
        Entry SerialisedValue (RawBlobRef m h)
Entry.Delete -> Maybe res
forall a. Maybe a
Nothing