{-# LANGUAGE CPP #-}
{-# OPTIONS_HADDOCK not-home #-}

module Database.LSMTree.Internal.Lookup (
    ResolveSerialisedValue
  , LookupAcc
  , lookupsIOWithWriteBuffer
  , lookupsIO
    -- * Errors
  , TableCorruptedError (..)
    -- * Internal: exposed for tests and benchmarks
  , RunIx
  , KeyIx
  , RunIxKeyIx(..)
  , prepLookups
  , bloomQueries
  , indexSearches
  , intraPageLookupsWithWriteBuffer
  , intraPageLookupsOn
  ) where

import           Data.Bifunctor
import           Data.BloomFilter (Bloom)
import           Data.Primitive.ByteArray
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as VM
import qualified Data.Vector.Primitive as VP
import qualified Data.Vector.Unboxed as VU
import           Database.LSMTree.Internal.Arena (Arena, ArenaManager,
                     allocateFromArena, withArena)

import           Control.Exception (assert)
import           Control.Monad
import           Control.Monad.Class.MonadST as ST
import           Control.Monad.Class.MonadThrow (Exception, MonadThrow (..))
import           Control.Monad.Primitive
import           Control.Monad.ST.Strict
import           Control.RefCount

import           Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
import           Database.LSMTree.Internal.Entry
import           Database.LSMTree.Internal.Index (Index)
import qualified Database.LSMTree.Internal.Index as Index (search)
import           Database.LSMTree.Internal.Page (PageSpan (..), getNumPages,
                     pageSpanSize, unPageNo)
import           Database.LSMTree.Internal.RawBytes (RawBytes (..))
import qualified Database.LSMTree.Internal.RawBytes as RB
import           Database.LSMTree.Internal.RawPage
import           Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import           Database.LSMTree.Internal.Serialise
import qualified Database.LSMTree.Internal.Vector as V
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import           System.FS.API (BufferOffset (..), Handle)
import           System.FS.BlockIO.API

#ifdef BLOOM_QUERY_FAST
import           Database.LSMTree.Internal.BloomFilterQuery2 (RunIxKeyIx (..),
                     bloomQueries)
#else
import           Database.LSMTree.Internal.BloomFilterQuery1 (RunIxKeyIx (..),
                     bloomQueries)
#endif

-- | Prepare disk lookups by doing bloom filter queries, index searches and
-- creating 'IOOp's. The result is a vector of 'IOOp's and a vector of indexes,
-- both of which are the same length. The indexes record the run and key
-- associated with each 'IOOp'.
prepLookups ::
     Arena s
  -> V.Vector (Bloom SerialisedKey)
  -> V.Vector Index
  -> V.Vector (Handle h)
  -> V.Vector SerialisedKey
  -> ST s (VP.Vector RunIxKeyIx, V.Vector (IOOp s h))
prepLookups :: forall s h.
Arena s
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> ST s (Vector RunIxKeyIx, Vector (IOOp s h))
prepLookups Arena s
arena Vector (Bloom SerialisedKey)
blooms Vector Index
indexes Vector (Handle h)
kopsFiles Vector SerialisedKey
ks = do
  let !rkixs :: Vector RunIxKeyIx
rkixs = Vector (Bloom SerialisedKey)
-> Vector SerialisedKey -> Vector RunIxKeyIx
bloomQueries Vector (Bloom SerialisedKey)
blooms Vector SerialisedKey
ks
  !Vector (IOOp s h)
ioops <- Arena s
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> ST s (Vector (IOOp s h))
forall s h.
Arena s
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> ST s (Vector (IOOp s h))
indexSearches Arena s
arena Vector Index
indexes Vector (Handle h)
kopsFiles Vector SerialisedKey
ks Vector RunIxKeyIx
rkixs
  (Vector RunIxKeyIx, Vector (IOOp s h))
-> ST s (Vector RunIxKeyIx, Vector (IOOp s h))
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Vector RunIxKeyIx
rkixs, Vector (IOOp s h)
ioops)

type KeyIx = Int
type RunIx = Int

-- | Perform a batch of fence pointer index searches, and create an 'IOOp' for
-- each search result. The resulting vector has the same length as the
-- @VP.Vector RunIxKeyIx@ argument, because index searching always returns a
-- positive search result.
indexSearches ::
     Arena s
  -> V.Vector Index
  -> V.Vector (Handle h)
  -> V.Vector SerialisedKey
  -> VP.Vector RunIxKeyIx -- ^ Result of 'bloomQueries'
  -> ST s (V.Vector (IOOp s h))
indexSearches :: forall s h.
Arena s
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> ST s (Vector (IOOp s h))
indexSearches !Arena s
arena !Vector Index
indexes !Vector (Handle h)
kopsFiles !Vector SerialisedKey
ks !Vector RunIxKeyIx
rkixs = Size -> (Size -> ST s (IOOp s h)) -> ST s (Vector (IOOp s h))
forall (m :: * -> *) a.
Monad m =>
Size -> (Size -> m a) -> m (Vector a)
V.generateM Size
n ((Size -> ST s (IOOp s h)) -> ST s (Vector (IOOp s h)))
-> (Size -> ST s (IOOp s h)) -> ST s (Vector (IOOp s h))
forall a b. (a -> b) -> a -> b
$ \Size
i -> do
    let (RunIxKeyIx !Size
rix !Size
kix)
                     = Vector RunIxKeyIx
rkixs Vector RunIxKeyIx -> Size -> RunIxKeyIx
forall a. Prim a => Vector a -> Size -> a
`VP.unsafeIndex` Size
i
        !c :: Index
c           = Vector Index
indexes Vector Index -> Size -> Index
forall a. Vector a -> Size -> a
`V.unsafeIndex` Size
rix
        !h :: Handle h
h           = Vector (Handle h)
kopsFiles Vector (Handle h) -> Size -> Handle h
forall a. Vector a -> Size -> a
`V.unsafeIndex` Size
rix
        !k :: SerialisedKey
k           = Vector SerialisedKey
ks Vector SerialisedKey -> Size -> SerialisedKey
forall a. Vector a -> Size -> a
`V.unsafeIndex` Size
kix
        !pspan :: PageSpan
pspan       = SerialisedKey -> Index -> PageSpan
Index.search SerialisedKey
k Index
c
        !size :: NumPages
size        = PageSpan -> NumPages
pageSpanSize PageSpan
pspan
    -- Acquire a reusable buffer
    (!Size
off, !MutableByteArray s
buf) <- Arena (PrimState (ST s))
-> Size -> Size -> ST s (Size, MutableByteArray (PrimState (ST s)))
forall (m :: * -> *).
PrimMonad m =>
Arena (PrimState m)
-> Size -> Size -> m (Size, MutableByteArray (PrimState m))
allocateFromArena Arena s
Arena (PrimState (ST s))
arena (NumPages -> Size
forall i. Integral i => NumPages -> i
getNumPages NumPages
size Size -> Size -> Size
forall a. Num a => a -> a -> a
* Size
4096) Size
4096
    IOOp s h -> ST s (IOOp s h)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IOOp s h -> ST s (IOOp s h)) -> IOOp s h -> ST s (IOOp s h)
forall a b. (a -> b) -> a -> b
$! Handle h
-> FileOffset
-> MutableByteArray s
-> BufferOffset
-> ByteCount
-> IOOp s h
forall s h.
Handle h
-> FileOffset
-> MutableByteArray s
-> BufferOffset
-> ByteCount
-> IOOp s h
IOOpRead
              Handle h
h
              (Size -> FileOffset
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Size -> FileOffset) -> Size -> FileOffset
forall a b. (a -> b) -> a -> b
$ PageNo -> Size
unPageNo (PageSpan -> PageNo
pageSpanStart PageSpan
pspan) Size -> Size -> Size
forall a. Num a => a -> a -> a
* Size
4096)
              MutableByteArray s
buf
              (Size -> BufferOffset
forall a b. (Integral a, Num b) => a -> b
fromIntegral Size
off)
              (NumPages -> ByteCount
forall i. Integral i => NumPages -> i
getNumPages NumPages
size ByteCount -> ByteCount -> ByteCount
forall a. Num a => a -> a -> a
* ByteCount
4096)
  where
    !n :: Size
n = Vector RunIxKeyIx -> Size
forall a. Prim a => Vector a -> Size
VP.length Vector RunIxKeyIx
rkixs

-- | Value resolve function: what to do when resolving two @Mupdate@s
type ResolveSerialisedValue = SerialisedValue -> SerialisedValue -> SerialisedValue

type LookupAcc m h = V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))

{-# SPECIALIZE lookupsIOWithWriteBuffer ::
       HasBlockIO IO h
    -> ArenaManager RealWorld
    -> ResolveSerialisedValue
    -> WB.WriteBuffer
    -> Ref (WBB.WriteBufferBlobs IO h)
    -> V.Vector (Ref (Run IO h))
    -> V.Vector (Bloom SerialisedKey)
    -> V.Vector Index
    -> V.Vector (Handle h)
    -> V.Vector SerialisedKey
    -> IO (LookupAcc IO h)
  #-}
-- | Like 'lookupsIO', but takes a write buffer into account.
lookupsIOWithWriteBuffer ::
     forall m h. (MonadThrow m, MonadST m)
  => HasBlockIO m h
  -> ArenaManager (PrimState m)
  -> ResolveSerialisedValue
  -> WB.WriteBuffer
  -> Ref (WBB.WriteBufferBlobs m h)
  -> V.Vector (Ref (Run m h)) -- ^ Runs @rs@
  -> V.Vector (Bloom SerialisedKey) -- ^ The bloom filters inside @rs@
  -> V.Vector Index -- ^ The indexes inside @rs@
  -> V.Vector (Handle h) -- ^ The file handles to the key\/value files inside @rs@
  -> V.Vector SerialisedKey
  -> m (LookupAcc m h)
lookupsIOWithWriteBuffer :: forall (m :: * -> *) h.
(MonadThrow m, MonadST m) =>
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (LookupAcc m h)
lookupsIOWithWriteBuffer !HasBlockIO m h
hbio !ArenaManager (PrimState m)
mgr !ResolveSerialisedValue
resolveV !WriteBuffer
wb !Ref (WriteBufferBlobs m h)
wbblobs !Vector (Ref (Run m h))
rs !Vector (Bloom SerialisedKey)
blooms !Vector Index
indexes !Vector (Handle h)
kopsFiles !Vector SerialisedKey
ks =
    Bool -> m (LookupAcc m h) -> m (LookupAcc m h)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert Bool
precondition (m (LookupAcc m h) -> m (LookupAcc m h))
-> m (LookupAcc m h) -> m (LookupAcc m h)
forall a b. (a -> b) -> a -> b
$
    ArenaManager (PrimState m)
-> (Arena (PrimState m) -> m (LookupAcc m h)) -> m (LookupAcc m h)
forall (m :: * -> *) a.
PrimMonad m =>
ArenaManager (PrimState m) -> (Arena (PrimState m) -> m a) -> m a
withArena ArenaManager (PrimState m)
mgr ((Arena (PrimState m) -> m (LookupAcc m h)) -> m (LookupAcc m h))
-> (Arena (PrimState m) -> m (LookupAcc m h)) -> m (LookupAcc m h)
forall a b. (a -> b) -> a -> b
$ \Arena (PrimState m)
arena -> do
      (Vector RunIxKeyIx
rkixs, Vector (IOOp (PrimState m) h)
ioops) <- ST (PrimState m) (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
-> m (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO (ST
   (PrimState m) (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
 -> m (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h)))
-> ST
     (PrimState m) (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
-> m (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
forall a b. (a -> b) -> a -> b
$ Arena (PrimState m)
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> ST
     (PrimState m) (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
forall s h.
Arena s
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> ST s (Vector RunIxKeyIx, Vector (IOOp s h))
prepLookups Arena (PrimState m)
arena Vector (Bloom SerialisedKey)
blooms Vector Index
indexes Vector (Handle h)
kopsFiles Vector SerialisedKey
ks
      Vector IOResult
ioress <- HasBlockIO m h
-> (?callStack::CallStack) =>
   Vector (IOOp (PrimState m) h) -> m (Vector IOResult)
forall (m :: * -> *) h.
HasBlockIO m h
-> (?callStack::CallStack) =>
   Vector (IOOp (PrimState m) h) -> m (Vector IOResult)
submitIO HasBlockIO m h
hbio Vector (IOOp (PrimState m) h)
ioops
      ResolveSerialisedValue
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Vector (Ref (Run m h))
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> Vector (IOOp (PrimState m) h)
-> Vector IOResult
-> m (LookupAcc m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
ResolveSerialisedValue
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Vector (Ref (Run m h))
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> Vector (IOOp (PrimState m) h)
-> Vector IOResult
-> m (LookupAcc m h)
intraPageLookupsWithWriteBuffer ResolveSerialisedValue
resolveV WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbblobs Vector (Ref (Run m h))
rs Vector SerialisedKey
ks Vector RunIxKeyIx
rkixs Vector (IOOp (PrimState m) h)
ioops Vector IOResult
ioress
  where
    -- we check only that the lengths match, because checking the contents is
    -- too expensive.
    precondition :: Bool
precondition =
      Bool -> Bool -> Bool
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Size
forall a. Vector a -> Size
V.length Vector (Ref (Run m h))
rs Size -> Size -> Bool
forall a. Eq a => a -> a -> Bool
== Vector (Bloom SerialisedKey) -> Size
forall a. Vector a -> Size
V.length Vector (Bloom SerialisedKey)
blooms) (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$
      Bool -> Bool -> Bool
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Size
forall a. Vector a -> Size
V.length Vector (Ref (Run m h))
rs Size -> Size -> Bool
forall a. Eq a => a -> a -> Bool
== Vector Index -> Size
forall a. Vector a -> Size
V.length Vector Index
indexes) (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$
      Bool -> Bool -> Bool
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Size
forall a. Vector a -> Size
V.length Vector (Ref (Run m h))
rs Size -> Size -> Bool
forall a. Eq a => a -> a -> Bool
== Vector (Handle h) -> Size
forall a. Vector a -> Size
V.length Vector (Handle h)
kopsFiles) (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$
      Bool
True

{-# SPECIALIZE lookupsIO ::
       HasBlockIO IO h
    -> ArenaManager RealWorld
    -> ResolveSerialisedValue
    -> V.Vector (Ref (Run IO h))
    -> V.Vector (Bloom SerialisedKey)
    -> V.Vector Index
    -> V.Vector (Handle h)
    -> V.Vector SerialisedKey
    -> IO (LookupAcc IO h)
  #-}
-- | Batched lookups in I\/O.
--
-- PRECONDITION: the vectors of bloom filters, indexes and file handles
-- should pointwise match with the vectors of runs.
lookupsIO ::
     forall m h. (MonadThrow m, MonadST m)
  => HasBlockIO m h
  -> ArenaManager (PrimState m)
  -> ResolveSerialisedValue
  -> V.Vector (Ref (Run m h)) -- ^ Runs @rs@
  -> V.Vector (Bloom SerialisedKey) -- ^ The bloom filters inside @rs@
  -> V.Vector Index -- ^ The indexes inside @rs@
  -> V.Vector (Handle h) -- ^ The file handles to the key\/value files inside @rs@
  -> V.Vector SerialisedKey
  -> m (LookupAcc m h)
lookupsIO :: forall (m :: * -> *) h.
(MonadThrow m, MonadST m) =>
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (LookupAcc m h)
lookupsIO !HasBlockIO m h
hbio !ArenaManager (PrimState m)
mgr !ResolveSerialisedValue
resolveV !Vector (Ref (Run m h))
rs !Vector (Bloom SerialisedKey)
blooms !Vector Index
indexes !Vector (Handle h)
kopsFiles !Vector SerialisedKey
ks =
    Bool -> m (LookupAcc m h) -> m (LookupAcc m h)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert Bool
precondition (m (LookupAcc m h) -> m (LookupAcc m h))
-> m (LookupAcc m h) -> m (LookupAcc m h)
forall a b. (a -> b) -> a -> b
$
    ArenaManager (PrimState m)
-> (Arena (PrimState m) -> m (LookupAcc m h)) -> m (LookupAcc m h)
forall (m :: * -> *) a.
PrimMonad m =>
ArenaManager (PrimState m) -> (Arena (PrimState m) -> m a) -> m a
withArena ArenaManager (PrimState m)
mgr ((Arena (PrimState m) -> m (LookupAcc m h)) -> m (LookupAcc m h))
-> (Arena (PrimState m) -> m (LookupAcc m h)) -> m (LookupAcc m h)
forall a b. (a -> b) -> a -> b
$ \Arena (PrimState m)
arena -> do
      (Vector RunIxKeyIx
rkixs, Vector (IOOp (PrimState m) h)
ioops) <- ST (PrimState m) (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
-> m (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
ST.stToIO (ST
   (PrimState m) (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
 -> m (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h)))
-> ST
     (PrimState m) (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
-> m (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
forall a b. (a -> b) -> a -> b
$ Arena (PrimState m)
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> ST
     (PrimState m) (Vector RunIxKeyIx, Vector (IOOp (PrimState m) h))
forall s h.
Arena s
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> ST s (Vector RunIxKeyIx, Vector (IOOp s h))
prepLookups Arena (PrimState m)
arena Vector (Bloom SerialisedKey)
blooms Vector Index
indexes Vector (Handle h)
kopsFiles Vector SerialisedKey
ks
      Vector IOResult
ioress <- HasBlockIO m h
-> (?callStack::CallStack) =>
   Vector (IOOp (PrimState m) h) -> m (Vector IOResult)
forall (m :: * -> *) h.
HasBlockIO m h
-> (?callStack::CallStack) =>
   Vector (IOOp (PrimState m) h) -> m (Vector IOResult)
submitIO HasBlockIO m h
hbio Vector (IOOp (PrimState m) h)
ioops
      ResolveSerialisedValue
-> LookupAcc m h
-> Vector (Ref (Run m h))
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> Vector (IOOp (PrimState m) h)
-> Vector IOResult
-> m (LookupAcc m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
ResolveSerialisedValue
-> LookupAcc m h
-> Vector (Ref (Run m h))
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> Vector (IOOp (PrimState m) h)
-> Vector IOResult
-> m (LookupAcc m h)
intraPageLookupsOn ResolveSerialisedValue
resolveV ((SerialisedKey -> Maybe (Entry SerialisedValue (WeakBlobRef m h)))
-> Vector SerialisedKey -> LookupAcc m h
forall a b. (a -> b) -> Vector a -> Vector b
V.map (Maybe (Entry SerialisedValue (WeakBlobRef m h))
-> SerialisedKey -> Maybe (Entry SerialisedValue (WeakBlobRef m h))
forall a b. a -> b -> a
const Maybe (Entry SerialisedValue (WeakBlobRef m h))
forall a. Maybe a
Nothing) Vector SerialisedKey
ks) Vector (Ref (Run m h))
rs Vector SerialisedKey
ks Vector RunIxKeyIx
rkixs Vector (IOOp (PrimState m) h)
ioops Vector IOResult
ioress
  where
    -- we check only that the lengths match, because checking the contents is
    -- too expensive.
    precondition :: Bool
precondition =
      Bool -> Bool -> Bool
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Size
forall a. Vector a -> Size
V.length Vector (Ref (Run m h))
rs Size -> Size -> Bool
forall a. Eq a => a -> a -> Bool
== Vector (Bloom SerialisedKey) -> Size
forall a. Vector a -> Size
V.length Vector (Bloom SerialisedKey)
blooms) (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$
      Bool -> Bool -> Bool
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Size
forall a. Vector a -> Size
V.length Vector (Ref (Run m h))
rs Size -> Size -> Bool
forall a. Eq a => a -> a -> Bool
== Vector Index -> Size
forall a. Vector a -> Size
V.length Vector Index
indexes) (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$
      Bool -> Bool -> Bool
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Vector (Ref (Run m h)) -> Size
forall a. Vector a -> Size
V.length Vector (Ref (Run m h))
rs Size -> Size -> Bool
forall a. Eq a => a -> a -> Bool
== Vector (Handle h) -> Size
forall a. Vector a -> Size
V.length Vector (Handle h)
kopsFiles) (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$
      Bool
True

{-# SPECIALIZE intraPageLookupsWithWriteBuffer ::
       ResolveSerialisedValue
    -> WB.WriteBuffer
    -> Ref (WBB.WriteBufferBlobs IO h)
    -> V.Vector (Ref (Run IO h))
    -> V.Vector SerialisedKey
    -> VP.Vector RunIxKeyIx
    -> V.Vector (IOOp RealWorld h)
    -> VU.Vector IOResult
    -> IO (LookupAcc IO h)
  #-}
-- | Like 'intraPageLookupsOn', but uses the write buffer as the initial
-- accumulator.
--
intraPageLookupsWithWriteBuffer ::
     forall m h. (PrimMonad m, MonadThrow m)
  => ResolveSerialisedValue
  -> WB.WriteBuffer
  -> Ref (WBB.WriteBufferBlobs m h)
  -> V.Vector (Ref (Run m h))
  -> V.Vector SerialisedKey
  -> VP.Vector RunIxKeyIx
  -> V.Vector (IOOp (PrimState m) h)
  -> VU.Vector IOResult
  -> m (LookupAcc m h)
intraPageLookupsWithWriteBuffer :: forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
ResolveSerialisedValue
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Vector (Ref (Run m h))
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> Vector (IOOp (PrimState m) h)
-> Vector IOResult
-> m (LookupAcc m h)
intraPageLookupsWithWriteBuffer !ResolveSerialisedValue
resolveV !WriteBuffer
wb !Ref (WriteBufferBlobs m h)
wbblobs !Vector (Ref (Run m h))
rs !Vector SerialisedKey
ks !Vector RunIxKeyIx
rkixs !Vector (IOOp (PrimState m) h)
ioops !Vector IOResult
ioress = do
    -- The most recent values are in the write buffer, so we use it to
    -- initialise the accumulator.
    LookupAcc m h
acc0 <-
      Size
-> (Size -> m (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (LookupAcc m h)
forall (m :: * -> *) a.
Monad m =>
Size -> (Size -> m a) -> m (Vector a)
V.generateM (Vector SerialisedKey -> Size
forall a. Vector a -> Size
V.length Vector SerialisedKey
ks) ((Size -> m (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
 -> m (LookupAcc m h))
-> (Size -> m (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (LookupAcc m h)
forall a b. (a -> b) -> a -> b
$ \Size
ki ->
        case WriteBuffer
-> SerialisedKey -> Maybe (Entry SerialisedValue BlobSpan)
WB.lookup WriteBuffer
wb (Vector SerialisedKey -> Size -> SerialisedKey
forall a. Vector a -> Size -> a
V.unsafeIndex Vector SerialisedKey
ks Size
ki) of
          Maybe (Entry SerialisedValue BlobSpan)
Nothing -> Maybe (Entry SerialisedValue (WeakBlobRef m h))
-> m (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Entry SerialisedValue (WeakBlobRef m h))
forall a. Maybe a
Nothing
          Just Entry SerialisedValue BlobSpan
e  -> Maybe (Entry SerialisedValue (WeakBlobRef m h))
-> m (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Entry SerialisedValue (WeakBlobRef m h))
 -> m (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> Maybe (Entry SerialisedValue (WeakBlobRef m h))
-> m (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
forall a b. (a -> b) -> a -> b
$! Entry SerialisedValue (WeakBlobRef m h)
-> Maybe (Entry SerialisedValue (WeakBlobRef m h))
forall a. a -> Maybe a
Just (Entry SerialisedValue (WeakBlobRef m h)
 -> Maybe (Entry SerialisedValue (WeakBlobRef m h)))
-> Entry SerialisedValue (WeakBlobRef m h)
-> Maybe (Entry SerialisedValue (WeakBlobRef m h))
forall a b. (a -> b) -> a -> b
$! (BlobSpan -> WeakBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (WeakBlobRef m h)
forall a b.
(a -> b) -> Entry SerialisedValue a -> Entry SerialisedValue b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ref (WriteBufferBlobs m h) -> BlobSpan -> WeakBlobRef m h
forall (m :: * -> *) h.
Ref (WriteBufferBlobs m h) -> BlobSpan -> WeakBlobRef m h
WBB.mkWeakBlobRef Ref (WriteBufferBlobs m h)
wbblobs) Entry SerialisedValue BlobSpan
e
          -- TODO:  ^^ we should be able to avoid this allocation by
          -- combining the conversion with other later conversions.
    ResolveSerialisedValue
-> LookupAcc m h
-> Vector (Ref (Run m h))
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> Vector (IOOp (PrimState m) h)
-> Vector IOResult
-> m (LookupAcc m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
ResolveSerialisedValue
-> LookupAcc m h
-> Vector (Ref (Run m h))
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> Vector (IOOp (PrimState m) h)
-> Vector IOResult
-> m (LookupAcc m h)
intraPageLookupsOn ResolveSerialisedValue
resolveV LookupAcc m h
acc0 Vector (Ref (Run m h))
rs Vector SerialisedKey
ks Vector RunIxKeyIx
rkixs Vector (IOOp (PrimState m) h)
ioops Vector IOResult
ioress

-- | The table data is corrupted.
data TableCorruptedError
    = ErrLookupByteCountDiscrepancy
        -- | Expected byte count.
        !ByteCount
        -- | Actual byte count.
        !ByteCount
    deriving stock (Size -> TableCorruptedError -> ShowS
[TableCorruptedError] -> ShowS
TableCorruptedError -> String
(Size -> TableCorruptedError -> ShowS)
-> (TableCorruptedError -> String)
-> ([TableCorruptedError] -> ShowS)
-> Show TableCorruptedError
forall a.
(Size -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Size -> TableCorruptedError -> ShowS
showsPrec :: Size -> TableCorruptedError -> ShowS
$cshow :: TableCorruptedError -> String
show :: TableCorruptedError -> String
$cshowList :: [TableCorruptedError] -> ShowS
showList :: [TableCorruptedError] -> ShowS
Show, TableCorruptedError -> TableCorruptedError -> Bool
(TableCorruptedError -> TableCorruptedError -> Bool)
-> (TableCorruptedError -> TableCorruptedError -> Bool)
-> Eq TableCorruptedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TableCorruptedError -> TableCorruptedError -> Bool
== :: TableCorruptedError -> TableCorruptedError -> Bool
$c/= :: TableCorruptedError -> TableCorruptedError -> Bool
/= :: TableCorruptedError -> TableCorruptedError -> Bool
Eq)
    deriving anyclass (Show TableCorruptedError
Typeable TableCorruptedError
(Typeable TableCorruptedError, Show TableCorruptedError) =>
(TableCorruptedError -> SomeException)
-> (SomeException -> Maybe TableCorruptedError)
-> (TableCorruptedError -> String)
-> Exception TableCorruptedError
SomeException -> Maybe TableCorruptedError
TableCorruptedError -> String
TableCorruptedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: TableCorruptedError -> SomeException
toException :: TableCorruptedError -> SomeException
$cfromException :: SomeException -> Maybe TableCorruptedError
fromException :: SomeException -> Maybe TableCorruptedError
$cdisplayException :: TableCorruptedError -> String
displayException :: TableCorruptedError -> String
Exception)

{-# SPECIALIZE intraPageLookupsOn ::
       ResolveSerialisedValue
    -> LookupAcc IO h
    -> V.Vector (Ref (Run IO h))
    -> V.Vector SerialisedKey
    -> VP.Vector RunIxKeyIx
    -> V.Vector (IOOp RealWorld h)
    -> VU.Vector IOResult
    -> IO (LookupAcc IO h)
  #-}
-- | Intra-page lookups, and combining lookup results from multiple runs and
-- a potential initial accumulator (e.g. from the write buffer).
--
-- This function assumes that @rkixs@ is ordered such that newer runs are
-- handled first. The order matters for resolving cases where we find the same
-- key in multiple runs.
--
intraPageLookupsOn ::
     forall m h. (PrimMonad m, MonadThrow m)
  => ResolveSerialisedValue
  -> LookupAcc m h  -- initial acc
  -> V.Vector (Ref (Run m h))
  -> V.Vector SerialisedKey
  -> VP.Vector RunIxKeyIx
  -> V.Vector (IOOp (PrimState m) h)
  -> VU.Vector IOResult
  -> m (LookupAcc m h)
intraPageLookupsOn :: forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
ResolveSerialisedValue
-> LookupAcc m h
-> Vector (Ref (Run m h))
-> Vector SerialisedKey
-> Vector RunIxKeyIx
-> Vector (IOOp (PrimState m) h)
-> Vector IOResult
-> m (LookupAcc m h)
intraPageLookupsOn !ResolveSerialisedValue
resolveV !LookupAcc m h
acc0 !Vector (Ref (Run m h))
rs !Vector SerialisedKey
ks !Vector RunIxKeyIx
rkixs !Vector (IOOp (PrimState m) h)
ioops !Vector IOResult
ioress =
    Bool -> m (LookupAcc m h) -> m (LookupAcc m h)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (LookupAcc m h -> Size
forall a. Vector a -> Size
V.length LookupAcc m h
acc0 Size -> Size -> Bool
forall a. Eq a => a -> a -> Bool
== Vector SerialisedKey -> Size
forall a. Vector a -> Size
V.length Vector SerialisedKey
ks) (m (LookupAcc m h) -> m (LookupAcc m h))
-> m (LookupAcc m h) -> m (LookupAcc m h)
forall a b. (a -> b) -> a -> b
$ do
    -- We accumulate results into the 'res' vector. When there are several
    -- lookup hits for the same key then we combine the results. The combining
    -- operator is associative but not commutative, so we must do this in the
    -- right order. We start with the write buffer lookup results and then go
    -- through the run lookup results in rkixs, which must be ordered by run.
    --
    -- TODO: reassess the representation of the result vector to try to reduce
    -- intermediate allocations. For example use a less convenient
    -- representation with several vectors (e.g. separate blob info) and
    -- convert to the final convenient representation in a single pass near
    -- the surface API so that all the conversions can be done in one pass
    -- without intermediate allocations.
    --
    MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res <- LookupAcc m h
-> m (MVector
        (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) a.
PrimMonad m =>
Vector a -> m (MVector (PrimState m) a)
V.unsafeThaw LookupAcc m h
acc0
    MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
-> Size -> m ()
loop MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res Size
0
    MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
-> m (LookupAcc m h)
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) a -> m (Vector a)
V.unsafeFreeze MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res
  where
    !n :: Size
n = Vector (IOOp (PrimState m) h) -> Size
forall a. Vector a -> Size
V.length Vector (IOOp (PrimState m) h)
ioops

    loop ::
         VM.MVector (PrimState m)
                    (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
      -> Int
      -> m ()
    loop :: MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
-> Size -> m ()
loop !MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res !Size
ioopix
      | Size
ioopix Size -> Size -> Bool
forall a. Eq a => a -> a -> Bool
== Size
n =  () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      | Bool
otherwise = do
          let ioop :: IOOp (PrimState m) h
ioop = Vector (IOOp (PrimState m) h)
ioops Vector (IOOp (PrimState m) h) -> Size -> IOOp (PrimState m) h
forall a. Vector a -> Size -> a
`V.unsafeIndex` Size
ioopix
              iores :: IOResult
iores = Vector IOResult
ioress Vector IOResult -> Size -> IOResult
forall a. Unbox a => Vector a -> Size -> a
`VU.unsafeIndex` Size
ioopix
          IOOp (PrimState m) h -> IOResult -> m ()
checkIOResult IOOp (PrimState m) h
ioop IOResult
iores
          let (RunIxKeyIx !Size
rix !Size
kix) = Vector RunIxKeyIx
rkixs Vector RunIxKeyIx -> Size -> RunIxKeyIx
forall a. Prim a => Vector a -> Size -> a
`VP.unsafeIndex` Size
ioopix
              r :: Ref (Run m h)
r = Vector (Ref (Run m h))
rs Vector (Ref (Run m h)) -> Size -> Ref (Run m h)
forall a. Vector a -> Size -> a
`V.unsafeIndex` Size
rix
              k :: SerialisedKey
k = Vector SerialisedKey
ks Vector SerialisedKey -> Size -> SerialisedKey
forall a. Vector a -> Size -> a
`V.unsafeIndex` Size
kix
          ByteArray
buf <- MutableByteArray (PrimState m) -> m ByteArray
forall (m :: * -> *).
PrimMonad m =>
MutableByteArray (PrimState m) -> m ByteArray
unsafeFreezeByteArray (IOOp (PrimState m) h -> MutableByteArray (PrimState m)
forall s h. IOOp s h -> MutableByteArray s
ioopBuffer IOOp (PrimState m) h
ioop)
          let boff :: Size
boff = BufferOffset -> Size
unBufferOffset (BufferOffset -> Size) -> BufferOffset -> Size
forall a b. (a -> b) -> a -> b
$ IOOp (PrimState m) h -> BufferOffset
forall s h. IOOp s h -> BufferOffset
ioopBufferOffset IOOp (PrimState m) h
ioop
          case RawPage
-> SerialisedKey -> RawPageLookup (Entry SerialisedValue BlobSpan)
rawPageLookup (ByteArray -> Size -> RawPage
makeRawPage ByteArray
buf Size
boff) SerialisedKey
k of
            RawPageLookup (Entry SerialisedValue BlobSpan)
LookupEntryNotPresent -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            -- Laziness ensures that we only compute the forcing of the value in
            -- the entry when the result is needed.
            LookupEntry Entry SerialisedValue BlobSpan
e         -> do
                let e' :: Entry SerialisedValue (WeakBlobRef m h)
e' = (SerialisedValue -> SerialisedValue)
-> (BlobSpan -> WeakBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (WeakBlobRef m h)
forall a b c d. (a -> b) -> (c -> d) -> Entry a c -> Entry b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap SerialisedValue -> SerialisedValue
copySerialisedValue (Ref (Run m h) -> BlobSpan -> WeakBlobRef m h
forall (m :: * -> *) h.
Ref (Run m h) -> BlobSpan -> WeakBlobRef m h
Run.mkWeakBlobRef Ref (Run m h)
r) Entry SerialisedValue BlobSpan
e
                -- TODO: ^^ we should be able to avoid this allocation by
                -- combining the conversion with other later conversions.
                MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
-> (Entry SerialisedValue (WeakBlobRef m h)
    -> Entry SerialisedValue (WeakBlobRef m h)
    -> Entry SerialisedValue (WeakBlobRef m h))
-> Size
-> Entry SerialisedValue (WeakBlobRef m h)
-> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) (Maybe a)
-> (a -> a -> a) -> Size -> a -> m ()
V.unsafeInsertWithMStrict MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res (ResolveSerialisedValue
-> Entry SerialisedValue (WeakBlobRef m h)
-> Entry SerialisedValue (WeakBlobRef m h)
-> Entry SerialisedValue (WeakBlobRef m h)
forall v b. (v -> v -> v) -> Entry v b -> Entry v b -> Entry v b
combine ResolveSerialisedValue
resolveV) Size
kix Entry SerialisedValue (WeakBlobRef m h)
e'
            -- Laziness ensures that we only compute the appending of the prefix
            -- and suffix when the result is needed. We do not use 'force' here,
            -- since appending already creates a new primary vector.
            LookupEntryOverflow Entry SerialisedValue BlobSpan
e Word32
m -> do
                let v' :: SerialisedValue -> SerialisedValue
v' (SerialisedValue RawBytes
v) = RawBytes -> SerialisedValue
SerialisedValue (RawBytes -> SerialisedValue) -> RawBytes -> SerialisedValue
forall a b. (a -> b) -> a -> b
$ RawBytes
v RawBytes -> RawBytes -> RawBytes
forall a. Semigroup a => a -> a -> a
<>
                      Vector Word8 -> RawBytes
RawBytes (Size -> Size -> ByteArray -> Vector Word8
forall a. Prim a => Size -> Size -> ByteArray -> Vector a
V.mkPrimVector
                                  (BufferOffset -> Size
unBufferOffset (IOOp (PrimState m) h -> BufferOffset
forall s h. IOOp s h -> BufferOffset
ioopBufferOffset IOOp (PrimState m) h
ioop) Size -> Size -> Size
forall a. Num a => a -> a -> a
+ Size
4096)
                                  (Word32 -> Size
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
m)
                                  ByteArray
buf)
                    e' :: Entry SerialisedValue (WeakBlobRef m h)
e' = (SerialisedValue -> SerialisedValue)
-> (BlobSpan -> WeakBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (WeakBlobRef m h)
forall a b c d. (a -> b) -> (c -> d) -> Entry a c -> Entry b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap SerialisedValue -> SerialisedValue
v' (Ref (Run m h) -> BlobSpan -> WeakBlobRef m h
forall (m :: * -> *) h.
Ref (Run m h) -> BlobSpan -> WeakBlobRef m h
Run.mkWeakBlobRef Ref (Run m h)
r) Entry SerialisedValue BlobSpan
e
                MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
-> (Entry SerialisedValue (WeakBlobRef m h)
    -> Entry SerialisedValue (WeakBlobRef m h)
    -> Entry SerialisedValue (WeakBlobRef m h))
-> Size
-> Entry SerialisedValue (WeakBlobRef m h)
-> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MVector (PrimState m) (Maybe a)
-> (a -> a -> a) -> Size -> a -> m ()
V.unsafeInsertWithMStrict MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res (ResolveSerialisedValue
-> Entry SerialisedValue (WeakBlobRef m h)
-> Entry SerialisedValue (WeakBlobRef m h)
-> Entry SerialisedValue (WeakBlobRef m h)
forall v b. (v -> v -> v) -> Entry v b -> Entry v b -> Entry v b
combine ResolveSerialisedValue
resolveV) Size
kix Entry SerialisedValue (WeakBlobRef m h)
e'
          MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
-> Size -> m ()
loop MVector
  (PrimState m) (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res (Size
ioopix Size -> Size -> Size
forall a. Num a => a -> a -> a
+ Size
1)

    -- Check that the IOOp was performed succesfully, and that it wrote/read
    -- exactly as many bytes as we expected. If not, then the buffer won't
    -- contain the correct number of disk-page bytes, so we throw an exception.
    checkIOResult :: IOOp (PrimState m) h -> IOResult -> m ()
    checkIOResult :: IOOp (PrimState m) h -> IOResult -> m ()
checkIOResult IOOp (PrimState m) h
ioop (IOResult ByteCount
m) =
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteCount
expected ByteCount -> ByteCount -> Bool
forall a. Eq a => a -> a -> Bool
== ByteCount
m) (m () -> m ())
-> (TableCorruptedError -> m ()) -> TableCorruptedError -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (TableCorruptedError -> m ()) -> TableCorruptedError -> m ()
forall a b. (a -> b) -> a -> b
$
          ByteCount -> ByteCount -> TableCorruptedError
ErrLookupByteCountDiscrepancy ByteCount
expected ByteCount
m
      where expected :: ByteCount
expected = IOOp (PrimState m) h -> ByteCount
forall s h. IOOp s h -> ByteCount
ioopByteCount IOOp (PrimState m) h
ioop

    -- Force a serialised value to not retain any memory by copying the
    -- underlying raw bytes.
    copySerialisedValue :: SerialisedValue -> SerialisedValue
    copySerialisedValue :: SerialisedValue -> SerialisedValue
copySerialisedValue (SerialisedValue RawBytes
rb) =
        RawBytes -> SerialisedValue
SerialisedValue (RawBytes -> RawBytes
RB.copy RawBytes
rb)