{-# OPTIONS_HADDOCK not-home #-}

-- | A run that is being read incrementally.
--
module Database.LSMTree.Internal.RunReader (
    RunReader (..)
  , OffsetKey (..)
  , new
  , next
  , close
  , Result (..)
  , Entry (..)
  , toFullEntry
  , appendOverflow
    -- * Exported for WriteBufferReader
  , mkEntryOverflow
  , readDiskPage
  , readOverflowPages
  ) where

import           Control.Exception (assert)
import           Control.Monad (guard, when)
import           Control.Monad.Class.MonadST (MonadST (..))
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow (MonadCatch (..),
                     MonadMask (..), MonadThrow (..))
import           Control.Monad.Primitive (PrimMonad (..))
import           Control.RefCount
import           Data.Bifunctor (first)
import           Data.Maybe (isNothing)
import           Data.Primitive.ByteArray (newPinnedByteArray,
                     unsafeFreezeByteArray)
import           Data.Primitive.MutVar (MutVar, newMutVar, readMutVar,
                     writeMutVar)
import           Data.Primitive.PrimVar
import           Data.Word (Word16, Word32)
import           Database.LSMTree.Internal.BitMath (ceilDivPageSize,
                     mulPageSize, roundUpToPageSize)
import           Database.LSMTree.Internal.BlobFile as BlobFile
import           Database.LSMTree.Internal.BlobRef as BlobRef
import qualified Database.LSMTree.Internal.Entry as E
import qualified Database.LSMTree.Internal.Index as Index (search)
import           Database.LSMTree.Internal.Page (PageNo (..), PageSpan (..),
                     getNumPages, nextPageNo)
import           Database.LSMTree.Internal.Paths
import qualified Database.LSMTree.Internal.RawBytes as RB
import           Database.LSMTree.Internal.RawOverflowPage (RawOverflowPage,
                     pinnedByteArrayToOverflowPages, rawOverflowPageRawBytes)
import           Database.LSMTree.Internal.RawPage
import qualified Database.LSMTree.Internal.Run as Run
import           Database.LSMTree.Internal.Serialise
import qualified System.FS.API as FS
import           System.FS.API (HasFS)
import qualified System.FS.BlockIO.API as FS
import           System.FS.BlockIO.API (HasBlockIO)

-- | Allows reading the k\/ops of a run incrementally, using its own read-only
-- file handle and in-memory cache of the current disk page.
--
-- Creating a 'RunReader' does not retain a reference to the 'Run', but does
-- retain an independent reference on the run's blob file. It is not necessary
-- to separately retain the 'Run' for correct use of the 'RunReader'. There is
-- one important caveat however: the 'RunReader' maintains the validity of
-- 'BlobRef's only up until the point where the reader is drained (or
-- explicitly closed). In particular this means 'BlobRefs' can be invalidated
-- as soon as the 'next' returns 'Empty'. If this is not sufficient then it is
-- necessary to separately retain a reference to the 'Run' or its 'BlobFile' to
-- preserve the validity of 'BlobRefs'.
--
-- New pages are loaded when trying to read their first entry.
--
-- TODO(optimise): Reuse page buffers using some kind of allocator. However,
-- deciding how long a page needs to stay around is not trivial.
data RunReader m h = RunReader {
      -- | The disk page currently being read. If it is 'Nothing', the reader
      -- is considered closed.
      forall (m :: * -> *) h.
RunReader m h -> MutVar (PrimState m) (Maybe RawPage)
readerCurrentPage    :: !(MutVar (PrimState m) (Maybe RawPage))
      -- | The index of the entry to be returned by the next call to 'next'.
    , forall (m :: * -> *) h.
RunReader m h -> PrimVar (PrimState m) Word16
readerCurrentEntryNo :: !(PrimVar (PrimState m) Word16)
      -- | Read mode file handle into the run's k\/ops file. We rely on it to
      -- track the position of the next disk page to read, instead of keeping
      -- a counter ourselves. Also, the run's handle is supposed to be opened
      -- with @O_DIRECT@, which is counterproductive here.
    , forall (m :: * -> *) h. RunReader m h -> Handle h
readerKOpsHandle     :: !(FS.Handle h)
      -- | The blob file from the run this reader is reading from.
    , forall (m :: * -> *) h. RunReader m h -> Ref (BlobFile m h)
readerBlobFile       :: !(Ref (BlobFile m h))
    , forall (m :: * -> *) h. RunReader m h -> RunDataCaching
readerRunDataCaching :: !Run.RunDataCaching
    , forall (m :: * -> *) h. RunReader m h -> HasFS m h
readerHasFS          :: !(HasFS m h)
    , forall (m :: * -> *) h. RunReader m h -> HasBlockIO m h
readerHasBlockIO     :: !(HasBlockIO m h)
    }

data OffsetKey = NoOffsetKey | OffsetKey !SerialisedKey

{-# SPECIALISE new ::
     OffsetKey
  -> Ref (Run.Run IO h)
  -> IO (RunReader IO h) #-}
new :: forall m h.
     (MonadMask m, MonadSTM m, PrimMonad m)
  => OffsetKey
  -> Ref (Run.Run m h)
  -> m  (RunReader m h)
new :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
OffsetKey -> Ref (Run m h) -> m (RunReader m h)
new !OffsetKey
offsetKey
    readerRun :: Ref (Run m h)
readerRun@(DeRef Run.Run {
      Ref (BlobFile m h)
runBlobFile :: Ref (BlobFile m h)
runBlobFile :: forall (m :: * -> *) h. Run m h -> Ref (BlobFile m h)
runBlobFile,
      runRunDataCaching :: forall (m :: * -> *) h. Run m h -> RunDataCaching
runRunDataCaching = RunDataCaching
readerRunDataCaching,
      runHasFS :: forall (m :: * -> *) h. Run m h -> HasFS m h
runHasFS          = HasFS m h
readerHasFS,
      runHasBlockIO :: forall (m :: * -> *) h. Run m h -> HasBlockIO m h
runHasBlockIO     = HasBlockIO m h
readerHasBlockIO,
      runIndex :: forall (m :: * -> *) h. Run m h -> Index
runIndex          = Index
index
    }) = do
    (Handle h
readerKOpsHandle :: FS.Handle h) <-
      HasFS m h -> HasCallStack => FsPath -> OpenMode -> m (Handle h)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> OpenMode -> m (Handle h)
FS.hOpen HasFS m h
readerHasFS (RunFsPaths -> FsPath
runKOpsPath (Ref (Run m h) -> RunFsPaths
forall (m :: * -> *) h. Ref (Run m h) -> RunFsPaths
Run.runFsPaths Ref (Run m h)
readerRun)) OpenMode
FS.ReadMode m (Handle h) -> (Handle h -> m (Handle h)) -> m (Handle h)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Handle h
h -> do
        Word64
fileSize <- HasFS m h -> HasCallStack => Handle h -> m Word64
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Handle h -> m Word64
FS.hGetSize HasFS m h
readerHasFS Handle h
h
        let fileSizeInPages :: Word64
fileSizeInPages = Word64
fileSize Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
`div` Int -> Word64
forall a. Enum a => Int -> a
toEnum Int
pageSize
        let indexedPages :: Word64
indexedPages = NumPages -> Word64
forall i. Integral i => NumPages -> i
getNumPages (NumPages -> Word64) -> NumPages -> Word64
forall a b. (a -> b) -> a -> b
$ Ref (Run m h) -> NumPages
forall (m :: * -> *) h. Ref (Run m h) -> NumPages
Run.sizeInPages Ref (Run m h)
readerRun
        Bool -> m (Handle h) -> m (Handle h)
forall a. HasCallStack => Bool -> a -> a
assert (Word64
indexedPages Word64 -> Word64 -> Bool
forall a. Eq a => a -> a -> Bool
== Word64
fileSizeInPages) (m (Handle h) -> m (Handle h)) -> m (Handle h) -> m (Handle h)
forall a b. (a -> b) -> a -> b
$ Handle h -> m (Handle h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Handle h
h
    -- Advise the OS that this file is being read sequentially, which will
    -- double the readahead window in response (only for this file descriptor)
    HasBlockIO m h -> Handle h -> Advice -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> Handle h -> Advice -> m ()
FS.hAdviseAll HasBlockIO m h
readerHasBlockIO Handle h
readerKOpsHandle Advice
FS.AdviceSequential

    (Maybe RawPage
page, Word16
entryNo) <- Handle h -> m (Maybe RawPage, Word16)
seekFirstEntry Handle h
readerKOpsHandle

    Ref (BlobFile m h)
readerBlobFile <- Ref (BlobFile m h) -> m (Ref (BlobFile m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (BlobFile m h)
runBlobFile
    PrimVar (PrimState m) Word16
readerCurrentEntryNo <- Word16 -> m (PrimVar (PrimState m) Word16)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
a -> m (PrimVar (PrimState m) a)
newPrimVar Word16
entryNo
    MutVar (PrimState m) (Maybe RawPage)
readerCurrentPage <- Maybe RawPage -> m (MutVar (PrimState m) (Maybe RawPage))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar Maybe RawPage
page
    let reader :: RunReader m h
reader = RunReader {Handle h
HasFS m h
HasBlockIO m h
Ref (BlobFile m h)
MutVar (PrimState m) (Maybe RawPage)
PrimVar (PrimState m) Word16
RunDataCaching
readerCurrentPage :: MutVar (PrimState m) (Maybe RawPage)
readerCurrentEntryNo :: PrimVar (PrimState m) Word16
readerKOpsHandle :: Handle h
readerBlobFile :: Ref (BlobFile m h)
readerRunDataCaching :: RunDataCaching
readerHasFS :: HasFS m h
readerHasBlockIO :: HasBlockIO m h
readerRunDataCaching :: RunDataCaching
readerHasFS :: HasFS m h
readerHasBlockIO :: HasBlockIO m h
readerKOpsHandle :: Handle h
readerBlobFile :: Ref (BlobFile m h)
readerCurrentEntryNo :: PrimVar (PrimState m) Word16
readerCurrentPage :: MutVar (PrimState m) (Maybe RawPage)
..}

    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe RawPage -> Bool
forall a. Maybe a -> Bool
isNothing Maybe RawPage
page) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
      RunReader m h -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
RunReader m h -> m ()
close RunReader m h
reader
    RunReader m h -> m (RunReader m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return RunReader m h
reader
  where
    seekFirstEntry :: Handle h -> m (Maybe RawPage, Word16)
seekFirstEntry Handle h
readerKOpsHandle =
        case OffsetKey
offsetKey of
          OffsetKey
NoOffsetKey -> do
            -- Load first page from disk, if it exists.
            Maybe RawPage
firstPage <- HasFS m h -> Handle h -> m (Maybe RawPage)
forall (m :: * -> *) h.
(MonadCatch m, PrimMonad m) =>
HasFS m h -> Handle h -> m (Maybe RawPage)
readDiskPage HasFS m h
readerHasFS Handle h
readerKOpsHandle
            (Maybe RawPage, Word16) -> m (Maybe RawPage, Word16)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe RawPage
firstPage, Word16
0)
          OffsetKey SerialisedKey
offset -> do
            -- Use the index to find the page number for the key (if it exists).
            let PageSpan PageNo
pageNo PageNo
pageEnd = SerialisedKey -> Index -> PageSpan
Index.search SerialisedKey
offset Index
index
            HasFS m h -> PageNo -> Handle h -> m ()
forall (m :: * -> *) h. HasFS m h -> PageNo -> Handle h -> m ()
seekToDiskPage HasFS m h
readerHasFS PageNo
pageNo Handle h
readerKOpsHandle
            HasFS m h -> Handle h -> m (Maybe RawPage)
forall (m :: * -> *) h.
(MonadCatch m, PrimMonad m) =>
HasFS m h -> Handle h -> m (Maybe RawPage)
readDiskPage HasFS m h
readerHasFS Handle h
readerKOpsHandle m (Maybe RawPage)
-> (Maybe RawPage -> m (Maybe RawPage, Word16))
-> m (Maybe RawPage, Word16)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Maybe RawPage
Nothing ->
                (Maybe RawPage, Word16) -> m (Maybe RawPage, Word16)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe RawPage
forall a. Maybe a
Nothing, Word16
0)
              Just RawPage
foundPage -> do
                case RawPage -> SerialisedKey -> Maybe Word16
rawPageFindKey RawPage
foundPage SerialisedKey
offset of
                  Just Word16
n ->
                    -- Found an appropriate index within the index's page.
                    (Maybe RawPage, Word16) -> m (Maybe RawPage, Word16)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (RawPage -> Maybe RawPage
forall a. a -> Maybe a
Just RawPage
foundPage, Word16
n)

                  Maybe Word16
_ -> do
                    -- The index said that the key, if it were to exist, would
                    -- live on pageNo, but then rawPageFindKey tells us that in
                    -- fact there is no key greater than or equal to the given
                    -- offset on this page.
                    -- This tells us that the key does not exist, but that if it
                    -- were to exist, it would be between the last key in this
                    -- page and the first key in the next page.
                    -- Thus the reader should be initialised to return keys
                    -- starting from the next (non-overflow) page.
                    HasFS m h -> PageNo -> Handle h -> m ()
forall (m :: * -> *) h. HasFS m h -> PageNo -> Handle h -> m ()
seekToDiskPage HasFS m h
readerHasFS (PageNo -> PageNo
nextPageNo PageNo
pageEnd) Handle h
readerKOpsHandle
                    Maybe RawPage
nextPage <- HasFS m h -> Handle h -> m (Maybe RawPage)
forall (m :: * -> *) h.
(MonadCatch m, PrimMonad m) =>
HasFS m h -> Handle h -> m (Maybe RawPage)
readDiskPage HasFS m h
readerHasFS Handle h
readerKOpsHandle
                    (Maybe RawPage, Word16) -> m (Maybe RawPage, Word16)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe RawPage
nextPage, Word16
0)

{-# SPECIALISE close ::
     RunReader IO h
  -> IO () #-}
-- | This function should be called when discarding a 'RunReader' before it
-- was done (i.e. returned 'Empty'). This avoids leaking file handles.
-- Once it has been called, do not use the reader any more!
close ::
     (MonadSTM m, MonadMask m, PrimMonad m)
  => RunReader m h
  -> m ()
close :: forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
RunReader m h -> m ()
close RunReader{Handle h
HasFS m h
HasBlockIO m h
Ref (BlobFile m h)
MutVar (PrimState m) (Maybe RawPage)
PrimVar (PrimState m) Word16
RunDataCaching
readerCurrentPage :: forall (m :: * -> *) h.
RunReader m h -> MutVar (PrimState m) (Maybe RawPage)
readerCurrentEntryNo :: forall (m :: * -> *) h.
RunReader m h -> PrimVar (PrimState m) Word16
readerKOpsHandle :: forall (m :: * -> *) h. RunReader m h -> Handle h
readerBlobFile :: forall (m :: * -> *) h. RunReader m h -> Ref (BlobFile m h)
readerRunDataCaching :: forall (m :: * -> *) h. RunReader m h -> RunDataCaching
readerHasFS :: forall (m :: * -> *) h. RunReader m h -> HasFS m h
readerHasBlockIO :: forall (m :: * -> *) h. RunReader m h -> HasBlockIO m h
readerCurrentPage :: MutVar (PrimState m) (Maybe RawPage)
readerCurrentEntryNo :: PrimVar (PrimState m) Word16
readerKOpsHandle :: Handle h
readerBlobFile :: Ref (BlobFile m h)
readerRunDataCaching :: RunDataCaching
readerHasFS :: HasFS m h
readerHasBlockIO :: HasBlockIO m h
..} = do
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (RunDataCaching
readerRunDataCaching RunDataCaching -> RunDataCaching -> Bool
forall a. Eq a => a -> a -> Bool
== RunDataCaching
Run.NoCacheRunData) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
      -- drop the file from the OS page cache
      HasBlockIO m h -> Handle h -> m ()
forall (m :: * -> *) h. HasBlockIO m h -> Handle h -> m ()
FS.hDropCacheAll HasBlockIO m h
readerHasBlockIO Handle h
readerKOpsHandle
    HasFS m h -> HasCallStack => Handle h -> m ()
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Handle h -> m ()
FS.hClose HasFS m h
readerHasFS Handle h
readerKOpsHandle
    Ref (BlobFile m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (BlobFile m h)
readerBlobFile
    --TODO: arguably we should have distinct finish and close and require that
    -- readers are _always_ closed, even after they have been drained.
    -- This would allow BlobRefs to remain valid until the reader is closed.
    -- Currently they are invalidated as soon as the cursor is drained.

-- | The 'SerialisedKey' and 'SerialisedValue' point into the in-memory disk
-- page. Keeping them alive will also prevent garbage collection of the 4k byte
-- array, so if they're long-lived, consider making a copy!
data Result m h
  = Empty
  | ReadEntry !SerialisedKey !(Entry m h)

data Entry m h =
    Entry
      !(E.Entry SerialisedValue (RawBlobRef m h))
  | -- | A large entry. The caller might be interested in various different
    -- (redundant) representation, so we return all of them.
    EntryOverflow
      -- | The value is just a prefix, with the remainder in the overflow pages.
      !(E.Entry SerialisedValue (RawBlobRef m h))
      -- | A page containing the single entry (or rather its prefix).
      !RawPage
      -- | Non-zero length of the overflow in bytes.
      !Word32
      -- | The overflow pages containing the suffix of the value (so at least
      -- the number of bytes specified above).
      --
      -- TODO(optimise): Sometimes, reading the overflow pages is not necessary.
      -- We could just return the page index and offer a separate function to do
      -- the disk I/O once needed.
      ![RawOverflowPage]

mkEntryOverflow ::
     E.Entry SerialisedValue (RawBlobRef m h)
  -> RawPage
  -> Word32
  -> [RawOverflowPage]
  -> Entry m h
mkEntryOverflow :: forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h)
-> RawPage -> Word32 -> [RawOverflowPage] -> Entry m h
mkEntryOverflow Entry SerialisedValue (RawBlobRef m h)
entryPrefix RawPage
page Word32
len [RawOverflowPage]
overflowPages =
    Bool -> Entry m h -> Entry m h
forall a. HasCallStack => Bool -> a -> a
assert (Word32
len Word32 -> Word32 -> Bool
forall a. Ord a => a -> a -> Bool
> Word32
0) (Entry m h -> Entry m h) -> Entry m h -> Entry m h
forall a b. (a -> b) -> a -> b
$
    Bool -> Entry m h -> Entry m h
forall a. HasCallStack => Bool -> a -> a
assert (RawPage -> Int
rawPageOverflowPages RawPage
page Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> Int
forall a. (Bits a, Num a) => a -> a
ceilDivPageSize (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
len)) (Entry m h -> Entry m h) -> Entry m h -> Entry m h
forall a b. (a -> b) -> a -> b
$
    Bool -> Entry m h -> Entry m h
forall a. HasCallStack => Bool -> a -> a
assert (RawPage -> Int
rawPageOverflowPages RawPage
page Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== [RawOverflowPage] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [RawOverflowPage]
overflowPages) (Entry m h -> Entry m h) -> Entry m h -> Entry m h
forall a b. (a -> b) -> a -> b
$
      Entry SerialisedValue (RawBlobRef m h)
-> RawPage -> Word32 -> [RawOverflowPage] -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h)
-> RawPage -> Word32 -> [RawOverflowPage] -> Entry m h
EntryOverflow Entry SerialisedValue (RawBlobRef m h)
entryPrefix RawPage
page Word32
len [RawOverflowPage]
overflowPages

{-# INLINE toFullEntry #-}
toFullEntry :: Entry m h -> E.Entry SerialisedValue (RawBlobRef m h)
toFullEntry :: forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
toFullEntry = \case
    Entry Entry SerialisedValue (RawBlobRef m h)
e ->
      Entry SerialisedValue (RawBlobRef m h)
e
    EntryOverflow Entry SerialisedValue (RawBlobRef m h)
prefix RawPage
_ Word32
len [RawOverflowPage]
overflowPages ->
      (SerialisedValue -> SerialisedValue)
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue (RawBlobRef m h)
forall a b c. (a -> b) -> Entry a c -> Entry b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (Word32 -> [RawOverflowPage] -> SerialisedValue -> SerialisedValue
appendOverflow Word32
len [RawOverflowPage]
overflowPages) Entry SerialisedValue (RawBlobRef m h)
prefix

{-# INLINE appendOverflow #-}
appendOverflow :: Word32 -> [RawOverflowPage] -> SerialisedValue -> SerialisedValue
appendOverflow :: Word32 -> [RawOverflowPage] -> SerialisedValue -> SerialisedValue
appendOverflow Word32
len [RawOverflowPage]
overflowPages (SerialisedValue RawBytes
prefix) =
    RawBytes -> SerialisedValue
SerialisedValue (RawBytes -> SerialisedValue) -> RawBytes -> SerialisedValue
forall a b. (a -> b) -> a -> b
$
      Int -> RawBytes -> RawBytes
RB.take (RawBytes -> Int
RB.size RawBytes
prefix Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
len) (RawBytes -> RawBytes) -> RawBytes -> RawBytes
forall a b. (a -> b) -> a -> b
$
        [RawBytes] -> RawBytes
forall a. Monoid a => [a] -> a
mconcat (RawBytes
prefix RawBytes -> [RawBytes] -> [RawBytes]
forall a. a -> [a] -> [a]
: (RawOverflowPage -> RawBytes) -> [RawOverflowPage] -> [RawBytes]
forall a b. (a -> b) -> [a] -> [b]
map RawOverflowPage -> RawBytes
rawOverflowPageRawBytes [RawOverflowPage]
overflowPages)

{-# SPECIALISE next ::
     RunReader IO h
  -> IO (Result IO h) #-}
-- | Stop using the 'RunReader' after getting 'Empty', because the 'Reader' is
-- automatically closed!
next :: forall m h.
     (MonadMask m, MonadSTM m, MonadST m)
  => RunReader m h
  -> m (Result m h)
next :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
RunReader m h -> m (Result m h)
next reader :: RunReader m h
reader@RunReader {Handle h
HasFS m h
HasBlockIO m h
Ref (BlobFile m h)
MutVar (PrimState m) (Maybe RawPage)
PrimVar (PrimState m) Word16
RunDataCaching
readerCurrentPage :: forall (m :: * -> *) h.
RunReader m h -> MutVar (PrimState m) (Maybe RawPage)
readerCurrentEntryNo :: forall (m :: * -> *) h.
RunReader m h -> PrimVar (PrimState m) Word16
readerKOpsHandle :: forall (m :: * -> *) h. RunReader m h -> Handle h
readerBlobFile :: forall (m :: * -> *) h. RunReader m h -> Ref (BlobFile m h)
readerRunDataCaching :: forall (m :: * -> *) h. RunReader m h -> RunDataCaching
readerHasFS :: forall (m :: * -> *) h. RunReader m h -> HasFS m h
readerHasBlockIO :: forall (m :: * -> *) h. RunReader m h -> HasBlockIO m h
readerCurrentPage :: MutVar (PrimState m) (Maybe RawPage)
readerCurrentEntryNo :: PrimVar (PrimState m) Word16
readerKOpsHandle :: Handle h
readerBlobFile :: Ref (BlobFile m h)
readerRunDataCaching :: RunDataCaching
readerHasFS :: HasFS m h
readerHasBlockIO :: HasBlockIO m h
..} = do
    MutVar (PrimState m) (Maybe RawPage) -> m (Maybe RawPage)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (Maybe RawPage)
readerCurrentPage m (Maybe RawPage)
-> (Maybe RawPage -> m (Result m h)) -> m (Result m h)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe RawPage
Nothing ->
        Result m h -> m (Result m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Result m h
forall (m :: * -> *) h. Result m h
Empty
      Just RawPage
page -> do
        Word16
entryNo <- PrimVar (PrimState m) Word16 -> m Word16
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> m a
readPrimVar PrimVar (PrimState m) Word16
readerCurrentEntryNo
        Word16 -> RawPage -> m (Result m h)
go Word16
entryNo RawPage
page
  where
    go :: Word16 -> RawPage -> m (Result m h)
    go :: Word16 -> RawPage -> m (Result m h)
go !Word16
entryNo !RawPage
page =
        -- take entry from current page (resolve blob if necessary)
        case RawPage -> Word16 -> RawPageIndex (Entry SerialisedValue BlobSpan)
rawPageIndex RawPage
page Word16
entryNo of
          RawPageIndex (Entry SerialisedValue BlobSpan)
IndexNotPresent -> do
            -- if it is past the last one, load a new page from disk, try again
            Maybe RawPage
newPage <- HasFS m h -> Handle h -> m (Maybe RawPage)
forall (m :: * -> *) h.
(MonadCatch m, PrimMonad m) =>
HasFS m h -> Handle h -> m (Maybe RawPage)
readDiskPage HasFS m h
readerHasFS Handle h
readerKOpsHandle
            ST (PrimState m) () -> m ()
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
stToIO (ST (PrimState m) () -> m ()) -> ST (PrimState m) () -> m ()
forall a b. (a -> b) -> a -> b
$ MutVar (PrimState (ST (PrimState m))) (Maybe RawPage)
-> Maybe RawPage -> ST (PrimState m) ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (Maybe RawPage)
MutVar (PrimState (ST (PrimState m))) (Maybe RawPage)
readerCurrentPage Maybe RawPage
newPage
            case Maybe RawPage
newPage of
              Maybe RawPage
Nothing -> do
                RunReader m h -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
RunReader m h -> m ()
close RunReader m h
reader
                Result m h -> m (Result m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Result m h
forall (m :: * -> *) h. Result m h
Empty
              Just RawPage
p -> do
                PrimVar (PrimState m) Word16 -> Word16 -> m ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> a -> m ()
writePrimVar PrimVar (PrimState m) Word16
readerCurrentEntryNo Word16
0
                Word16 -> RawPage -> m (Result m h)
go Word16
0 RawPage
p  -- try again on the new page
          IndexEntry SerialisedKey
key Entry SerialisedValue BlobSpan
entry -> do
            PrimVar (PrimState m) Word16 -> (Word16 -> Word16) -> m ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> (a -> a) -> m ()
modifyPrimVar PrimVar (PrimState m) Word16
readerCurrentEntryNo (Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+Word16
1)
            let entry' :: Entry SerialisedValue (RawBlobRef m h)
entry' = (BlobSpan -> RawBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (RawBlobRef m h)
forall a b.
(a -> b) -> Entry SerialisedValue a -> Entry SerialisedValue b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ref (BlobFile m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (BlobFile m h) -> BlobSpan -> RawBlobRef m h
BlobRef.mkRawBlobRef Ref (BlobFile m h)
readerBlobFile) Entry SerialisedValue BlobSpan
entry
            let rawEntry :: Entry m h
rawEntry = Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
Entry Entry SerialisedValue (RawBlobRef m h)
entry'
            Result m h -> m (Result m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialisedKey -> Entry m h -> Result m h
forall (m :: * -> *) h. SerialisedKey -> Entry m h -> Result m h
ReadEntry SerialisedKey
key Entry m h
rawEntry)
          IndexEntryOverflow SerialisedKey
key Entry SerialisedValue BlobSpan
entry Word32
lenSuffix -> do
            -- TODO: we know that we need the next page, could already load?
            PrimVar (PrimState m) Word16 -> (Word16 -> Word16) -> m ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> (a -> a) -> m ()
modifyPrimVar PrimVar (PrimState m) Word16
readerCurrentEntryNo (Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+Word16
1)
            let entry' :: E.Entry SerialisedValue (RawBlobRef m h)
                entry' :: Entry SerialisedValue (RawBlobRef m h)
entry' = (BlobSpan -> RawBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (RawBlobRef m h)
forall a b.
(a -> b) -> Entry SerialisedValue a -> Entry SerialisedValue b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ref (BlobFile m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (BlobFile m h) -> BlobSpan -> RawBlobRef m h
BlobRef.mkRawBlobRef Ref (BlobFile m h)
readerBlobFile) Entry SerialisedValue BlobSpan
entry
            [RawOverflowPage]
overflowPages <- HasFS m h -> Handle h -> Word32 -> m [RawOverflowPage]
forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m, PrimMonad m) =>
HasFS m h -> Handle h -> Word32 -> m [RawOverflowPage]
readOverflowPages HasFS m h
readerHasFS Handle h
readerKOpsHandle Word32
lenSuffix
            let rawEntry :: Entry m h
rawEntry = Entry SerialisedValue (RawBlobRef m h)
-> RawPage -> Word32 -> [RawOverflowPage] -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h)
-> RawPage -> Word32 -> [RawOverflowPage] -> Entry m h
mkEntryOverflow Entry SerialisedValue (RawBlobRef m h)
entry' RawPage
page Word32
lenSuffix [RawOverflowPage]
overflowPages
            Result m h -> m (Result m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialisedKey -> Entry m h -> Result m h
forall (m :: * -> *) h. SerialisedKey -> Entry m h -> Result m h
ReadEntry SerialisedKey
key Entry m h
rawEntry)

{-------------------------------------------------------------------------------
  Utilities
-------------------------------------------------------------------------------}

seekToDiskPage :: HasFS m h -> PageNo -> FS.Handle h -> m ()
seekToDiskPage :: forall (m :: * -> *) h. HasFS m h -> PageNo -> Handle h -> m ()
seekToDiskPage HasFS m h
fs PageNo
pageNo Handle h
h = do
    HasFS m h -> HasCallStack => Handle h -> SeekMode -> Int64 -> m ()
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Handle h -> SeekMode -> Int64 -> m ()
FS.hSeek HasFS m h
fs Handle h
h SeekMode
FS.AbsoluteSeek (PageNo -> Int64
forall {a}. (Bits a, Num a) => PageNo -> a
pageNoToByteOffset PageNo
pageNo)
  where
    pageNoToByteOffset :: PageNo -> a
pageNoToByteOffset (PageNo Int
n) =
        Bool -> a -> a
forall a. HasCallStack => Bool -> a -> a
assert (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) (a -> a) -> a -> a
forall a b. (a -> b) -> a -> b
$
          a -> a
forall a. Bits a => a -> a
mulPageSize (Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)

{-# SPECIALISE readDiskPage ::
     HasFS IO h
  -> FS.Handle h
  -> IO (Maybe RawPage) #-}
-- | Returns 'Nothing' on EOF.
readDiskPage ::
     (MonadCatch m, PrimMonad m)
  => HasFS m h
  -> FS.Handle h
  -> m (Maybe RawPage)
readDiskPage :: forall (m :: * -> *) h.
(MonadCatch m, PrimMonad m) =>
HasFS m h -> Handle h -> m (Maybe RawPage)
readDiskPage HasFS m h
fs Handle h
h = do
    MutableByteArray (PrimState m)
mba <- Int -> m (MutableByteArray (PrimState m))
forall (m :: * -> *).
PrimMonad m =>
Int -> m (MutableByteArray (PrimState m))
newPinnedByteArray Int
pageSize
    -- TODO: make sure no other exception type can be thrown
    --
    -- TODO: if FS.FsReachEOF is thrown as an injected disk fault, then we
    -- incorrectly deduce that the file has no more contents. We should probably
    -- use an explicit file pointer instead in the style of 'FilePointer'.
    (FsError -> Maybe ())
-> (() -> m (Maybe RawPage))
-> m (Maybe RawPage)
-> m (Maybe RawPage)
forall e b a.
Exception e =>
(e -> Maybe b) -> (b -> m a) -> m a -> m a
forall (m :: * -> *) e b a.
(MonadCatch m, Exception e) =>
(e -> Maybe b) -> (b -> m a) -> m a -> m a
handleJust (Bool -> Maybe ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> Maybe ()) -> (FsError -> Bool) -> FsError -> Maybe ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FsErrorType -> FsError -> Bool
FS.isFsErrorType FsErrorType
FS.FsReachedEOF) (\()
_ -> Maybe RawPage -> m (Maybe RawPage)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe RawPage
forall a. Maybe a
Nothing) (m (Maybe RawPage) -> m (Maybe RawPage))
-> m (Maybe RawPage) -> m (Maybe RawPage)
forall a b. (a -> b) -> a -> b
$ do
      ByteCount
bytesRead <- HasFS m h
-> Handle h
-> MutableByteArray (PrimState m)
-> BufferOffset
-> ByteCount
-> m ByteCount
forall (m :: * -> *) h.
(HasCallStack, MonadThrow m) =>
HasFS m h
-> Handle h
-> MutableByteArray (PrimState m)
-> BufferOffset
-> ByteCount
-> m ByteCount
FS.hGetBufExactly HasFS m h
fs Handle h
h MutableByteArray (PrimState m)
mba BufferOffset
0 (Int -> ByteCount
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
pageSize)
      Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (ByteCount -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral ByteCount
bytesRead Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
pageSize) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      ByteArray
ba <- MutableByteArray (PrimState m) -> m ByteArray
forall (m :: * -> *).
PrimMonad m =>
MutableByteArray (PrimState m) -> m ByteArray
unsafeFreezeByteArray MutableByteArray (PrimState m)
mba
      let !rawPage :: RawPage
rawPage = ByteArray -> Int -> RawPage
unsafeMakeRawPage ByteArray
ba Int
0
      Maybe RawPage -> m (Maybe RawPage)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (RawPage -> Maybe RawPage
forall a. a -> Maybe a
Just RawPage
rawPage)

pageSize :: Int
pageSize :: Int
pageSize = Int
4096

{-# SPECIALISE readOverflowPages ::
     HasFS IO h
  -> FS.Handle h
  -> Word32
  -> IO [RawOverflowPage] #-}
-- | Throws exception on EOF. If a suffix was expected, the file should have it.
-- Reads full pages, despite the suffix only using part of the last page.
readOverflowPages ::
     (MonadSTM m, MonadThrow m, PrimMonad m)
   => HasFS m h
   -> FS.Handle h
   -> Word32
   -> m [RawOverflowPage]
readOverflowPages :: forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m, PrimMonad m) =>
HasFS m h -> Handle h -> Word32 -> m [RawOverflowPage]
readOverflowPages HasFS m h
fs Handle h
h Word32
len = do
    let lenPages :: Int
lenPages = Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Word32
forall a. (Bits a, Num a) => a -> a
roundUpToPageSize Word32
len)  -- always read whole pages
    MutableByteArray (PrimState m)
mba <- Int -> m (MutableByteArray (PrimState m))
forall (m :: * -> *).
PrimMonad m =>
Int -> m (MutableByteArray (PrimState m))
newPinnedByteArray Int
lenPages
    ByteCount
_ <- HasFS m h
-> Handle h
-> MutableByteArray (PrimState m)
-> BufferOffset
-> ByteCount
-> m ByteCount
forall (m :: * -> *) h.
(HasCallStack, MonadThrow m) =>
HasFS m h
-> Handle h
-> MutableByteArray (PrimState m)
-> BufferOffset
-> ByteCount
-> m ByteCount
FS.hGetBufExactly HasFS m h
fs Handle h
h MutableByteArray (PrimState m)
mba BufferOffset
0 (Int -> ByteCount
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
lenPages)
    ByteArray
ba <- MutableByteArray (PrimState m) -> m ByteArray
forall (m :: * -> *).
PrimMonad m =>
MutableByteArray (PrimState m) -> m ByteArray
unsafeFreezeByteArray MutableByteArray (PrimState m)
mba
    -- should not copy since 'ba' is pinned and its length is a multiple of 4k.
    [RawOverflowPage] -> m [RawOverflowPage]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([RawOverflowPage] -> m [RawOverflowPage])
-> [RawOverflowPage] -> m [RawOverflowPage]
forall a b. (a -> b) -> a -> b
$ Int -> Int -> ByteArray -> [RawOverflowPage]
pinnedByteArrayToOverflowPages Int
0 Int
lenPages ByteArray
ba