{-# LANGUAGE RecordWildCards #-}
{-# OPTIONS_HADDOCK not-home #-}
{- HLINT ignore "Redundant lambda" -}
{- HLINT ignore "Use camelCase" -}

-- | Incremental (in-memory portion of) run consruction
--
module Database.LSMTree.Internal.RunAcc (
    RunAcc (..)
  , new
  , unsafeFinalise
    -- * Adding key\/op pairs
    -- | There are a few variants of actions to add key\/op pairs to the run
    -- accumulator. Which one to use depends on a couple questions:
    --
    -- * Is it fully in memory or is it pre-serialised and only partly in
    --   memory?
    -- * Is the key\/op pair known to be \"small\" or \"large\"?
    --
    -- If it's in memory but it's not known whether it's small or large then
    -- use 'addKeyOp'. One can use 'entryWouldFitInPage' to find out if it's
    -- small or large. If it's in memory and known to be small or large then
    -- use 'addSmallKeyOp' or 'addLargeKeyOp' as appropriate. If it's large
    -- and pre-serialised, use 'addLargeSerialisedKeyOp' but note its
    -- constraints carefully.
    --
  , addKeyOp
  , addSmallKeyOp
  , addLargeKeyOp
  , addLargeSerialisedKeyOp
  , PageAcc.entryWouldFitInPage
    -- * Bloom filter allocation
  , RunBloomFilterAlloc (..)
    -- ** Exposed for testing
  , newMBloom
  , numHashFunctions
  , falsePositiveRate
  ) where

import           Control.DeepSeq (NFData (..))
import           Control.Exception (assert)
import           Control.Monad.ST.Strict
import           Data.BloomFilter (Bloom, MBloom)
import qualified Data.BloomFilter as Bloom
import qualified Data.BloomFilter.Easy as Bloom.Easy
import qualified Data.BloomFilter.Mutable as MBloom
import           Data.Primitive.PrimVar (PrimVar, modifyPrimVar, newPrimVar,
                     readPrimVar)
import           Data.Word (Word64)
import           Database.LSMTree.Internal.Assertions (fromIntegralChecked)
import           Database.LSMTree.Internal.BlobRef (BlobSpan (..))
import           Database.LSMTree.Internal.Chunk (Chunk)
import           Database.LSMTree.Internal.Entry (Entry (..), NumEntries (..))
import           Database.LSMTree.Internal.Index (Index, IndexAcc, IndexType)
import qualified Database.LSMTree.Internal.Index as Index (appendMulti,
                     appendSingle, newWithDefaults, unsafeEnd)
import           Database.LSMTree.Internal.PageAcc (PageAcc)
import qualified Database.LSMTree.Internal.PageAcc as PageAcc
import qualified Database.LSMTree.Internal.PageAcc1 as PageAcc
import           Database.LSMTree.Internal.RawOverflowPage
import           Database.LSMTree.Internal.RawPage (RawPage)
import qualified Database.LSMTree.Internal.RawPage as RawPage
import           Database.LSMTree.Internal.Serialise (SerialisedKey,
                     SerialisedValue)

{-------------------------------------------------------------------------------
  Incremental, in-memory run construction
-------------------------------------------------------------------------------}

-- | The run accumulator is a mutable structure that accumulates key\/op pairs.
-- It yields pages and chunks of the index incrementally, and returns the
-- Bloom filter and complete index at the end.
--
-- Use 'new' to start run construction, add new key\/operation pairs to the run
-- by using 'addKeyOp' and co, and complete run construction using
-- 'unsafeFinalise'.
data RunAcc s = RunAcc {
      forall s. RunAcc s -> MBloom s SerialisedKey
mbloom     :: !(MBloom s SerialisedKey)
    , forall s. RunAcc s -> IndexAcc s
mindex     :: !(IndexAcc s)
    , forall s. RunAcc s -> PageAcc s
mpageacc   :: !(PageAcc s)
    , forall s. RunAcc s -> PrimVar s Int
entryCount :: !(PrimVar s Int)
    }

-- | @'new' nentries@ starts an incremental run construction.
--
-- @nentries@ should be an upper bound on the expected number of entries in the
-- output run.
new ::
     NumEntries
  -> RunBloomFilterAlloc
  -> IndexType
  -> ST s (RunAcc s)
new :: forall s.
NumEntries -> RunBloomFilterAlloc -> IndexType -> ST s (RunAcc s)
new NumEntries
nentries RunBloomFilterAlloc
alloc IndexType
indexType = do
    MBloom s SerialisedKey
mbloom <- NumEntries -> RunBloomFilterAlloc -> ST s (MBloom s SerialisedKey)
forall s a. NumEntries -> RunBloomFilterAlloc -> ST s (MBloom s a)
newMBloom NumEntries
nentries RunBloomFilterAlloc
alloc
    IndexAcc s
mindex <- IndexType -> ST s (IndexAcc s)
forall s. IndexType -> ST s (IndexAcc s)
Index.newWithDefaults IndexType
indexType
    PageAcc s
mpageacc <- ST s (PageAcc s)
forall s. ST s (PageAcc s)
PageAcc.newPageAcc
    PrimVar s Int
entryCount <- Int -> ST s (PrimVar (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
a -> m (PrimVar (PrimState m) a)
newPrimVar Int
0
    RunAcc s -> ST s (RunAcc s)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure RunAcc{MBloom s SerialisedKey
PrimVar s Int
PageAcc s
IndexAcc s
mbloom :: MBloom s SerialisedKey
mindex :: IndexAcc s
mpageacc :: PageAcc s
entryCount :: PrimVar s Int
mbloom :: MBloom s SerialisedKey
mindex :: IndexAcc s
mpageacc :: PageAcc s
entryCount :: PrimVar s Int
..}

-- | Finalise an incremental run construction. Do /not/ use a 'RunAcc' after
-- finalising it.
--
-- The frozen bloom filter and compact index will be returned, along with the
-- final page of the run (if necessary), and the remaining chunks of the
-- incrementally constructed compact index.
unsafeFinalise ::
     RunAcc s
  -> ST s ( Maybe RawPage
          , Maybe Chunk
          , Bloom SerialisedKey
          , Index
          , NumEntries
          )
unsafeFinalise :: forall s.
RunAcc s
-> ST
     s
     (Maybe RawPage, Maybe Chunk, Bloom SerialisedKey, Index,
      NumEntries)
unsafeFinalise racc :: RunAcc s
racc@RunAcc {MBloom s SerialisedKey
PrimVar s Int
PageAcc s
IndexAcc s
mbloom :: forall s. RunAcc s -> MBloom s SerialisedKey
mindex :: forall s. RunAcc s -> IndexAcc s
mpageacc :: forall s. RunAcc s -> PageAcc s
entryCount :: forall s. RunAcc s -> PrimVar s Int
mbloom :: MBloom s SerialisedKey
mindex :: IndexAcc s
mpageacc :: PageAcc s
entryCount :: PrimVar s Int
..} = do
    Maybe (RawPage, Maybe Chunk)
mpagemchunk <- RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
forall s. RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
flushPageIfNonEmpty RunAcc s
racc
    (Maybe Chunk
mchunk', Index
index) <- IndexAcc s -> ST s (Maybe Chunk, Index)
forall s. IndexAcc s -> ST s (Maybe Chunk, Index)
Index.unsafeEnd IndexAcc s
mindex
    Bloom SerialisedKey
bloom <- MBloom s SerialisedKey -> ST s (Bloom SerialisedKey)
forall s (h :: * -> *) a. MBloom' s h a -> ST s (Bloom' h a)
Bloom.unsafeFreeze MBloom s SerialisedKey
mbloom
    Int
numEntries <- PrimVar (PrimState (ST s)) Int -> ST s Int
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> m a
readPrimVar PrimVar s Int
PrimVar (PrimState (ST s)) Int
entryCount
    let !mpage :: Maybe RawPage
mpage  = (RawPage, Maybe Chunk) -> RawPage
forall a b. (a, b) -> a
fst ((RawPage, Maybe Chunk) -> RawPage)
-> Maybe (RawPage, Maybe Chunk) -> Maybe RawPage
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (RawPage, Maybe Chunk)
mpagemchunk
        !mchunk :: Maybe Chunk
mchunk = Maybe (RawPage, Maybe Chunk) -> Maybe Chunk -> Maybe Chunk
selectChunk Maybe (RawPage, Maybe Chunk)
mpagemchunk Maybe Chunk
mchunk'
    (Maybe RawPage, Maybe Chunk, Bloom SerialisedKey, Index,
 NumEntries)
-> ST
     s
     (Maybe RawPage, Maybe Chunk, Bloom SerialisedKey, Index,
      NumEntries)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe RawPage
mpage, Maybe Chunk
mchunk, Bloom SerialisedKey
bloom, Index
index, Int -> NumEntries
NumEntries Int
numEntries)
  where
    selectChunk :: Maybe (RawPage, Maybe Chunk)
                -> Maybe Chunk
                -> Maybe Chunk
    selectChunk :: Maybe (RawPage, Maybe Chunk) -> Maybe Chunk -> Maybe Chunk
selectChunk (Just (RawPage
_page, Just Chunk
_chunk)) (Just Chunk
_chunk') =
        -- If flushing the page accumulator gives us an index chunk then
        -- the index can't have any more chunks when we finalise the index.
        [Char] -> Maybe Chunk
forall a. HasCallStack => [Char] -> a
error [Char]
"unsafeFinalise: impossible double final chunk"
    selectChunk (Just (RawPage
_page, Just Chunk
chunk)) Maybe Chunk
_ = Chunk -> Maybe Chunk
forall a. a -> Maybe a
Just Chunk
chunk
    selectChunk Maybe (RawPage, Maybe Chunk)
_ (Just Chunk
chunk)               = Chunk -> Maybe Chunk
forall a. a -> Maybe a
Just Chunk
chunk
    selectChunk Maybe (RawPage, Maybe Chunk)
_ Maybe Chunk
_                          = Maybe Chunk
forall a. Maybe a
Nothing

-- | Add a key\/op pair with an optional blob span to the run accumulator.
--
-- Note that this version expects the full value to be in the given'Entry', not
-- just a prefix of the value that fits into a single page.
--
-- If the key\/op pair is known to be \"small\" or \"large\" then you can use
-- the special versions 'addSmallKeyOp' or 'addLargeKeyOp'. If it is
-- pre-serialised, use 'addLargeSerialisedKeyOp'.
--
addKeyOp ::
     RunAcc s
  -> SerialisedKey
  -> Entry SerialisedValue BlobSpan -- ^ the full value, not just a prefix
  -> ST s ([RawPage], [RawOverflowPage], [Chunk])
addKeyOp :: forall s.
RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
addKeyOp RunAcc s
racc SerialisedKey
k Entry SerialisedValue BlobSpan
e
  | SerialisedKey -> Entry SerialisedValue BlobSpan -> Bool
forall b. SerialisedKey -> Entry SerialisedValue b -> Bool
PageAcc.entryWouldFitInPage SerialisedKey
k Entry SerialisedValue BlobSpan
e = Maybe (RawPage, Maybe Chunk)
-> ([RawPage], [RawOverflowPage], [Chunk])
smallToLarge (Maybe (RawPage, Maybe Chunk)
 -> ([RawPage], [RawOverflowPage], [Chunk]))
-> ST s (Maybe (RawPage, Maybe Chunk))
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s (Maybe (RawPage, Maybe Chunk))
forall s.
RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s (Maybe (RawPage, Maybe Chunk))
addSmallKeyOp RunAcc s
racc SerialisedKey
k Entry SerialisedValue BlobSpan
e
  | Bool
otherwise                       =                  RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall s.
RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
addLargeKeyOp RunAcc s
racc SerialisedKey
k Entry SerialisedValue BlobSpan
e
  where
    smallToLarge :: Maybe (RawPage, Maybe Chunk)
                 -> ([RawPage], [RawOverflowPage], [Chunk])
    smallToLarge :: Maybe (RawPage, Maybe Chunk)
-> ([RawPage], [RawOverflowPage], [Chunk])
smallToLarge Maybe (RawPage, Maybe Chunk)
Nothing                   = ([],     [], [])
    smallToLarge (Just (RawPage
page, Maybe Chunk
Nothing))    = ([RawPage
page], [], [])
    smallToLarge (Just (RawPage
page, Just Chunk
chunk)) = ([RawPage
page], [], [Chunk
chunk])

-- | Add a \"small\" key\/op pair with an optional blob span to the run
-- accumulator.
--
-- This version is /only/ for small entries that can fit within a single page.
-- Use 'addLargeKeyOp' if the entry is bigger than a page. If this distinction
-- is not known at the use site, use 'PageAcc.entryWouldFitInPage' to determine
-- which case applies, or use 'addKeyOp'.
--
-- This is guaranteed to add the key\/op, and it may yield (at most one) page.
--
addSmallKeyOp ::
     RunAcc s
  -> SerialisedKey
  -> Entry SerialisedValue BlobSpan
  -> ST s (Maybe (RawPage, Maybe Chunk))
addSmallKeyOp :: forall s.
RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s (Maybe (RawPage, Maybe Chunk))
addSmallKeyOp racc :: RunAcc s
racc@RunAcc{MBloom s SerialisedKey
PrimVar s Int
PageAcc s
IndexAcc s
mbloom :: forall s. RunAcc s -> MBloom s SerialisedKey
mindex :: forall s. RunAcc s -> IndexAcc s
mpageacc :: forall s. RunAcc s -> PageAcc s
entryCount :: forall s. RunAcc s -> PrimVar s Int
mbloom :: MBloom s SerialisedKey
mindex :: IndexAcc s
mpageacc :: PageAcc s
entryCount :: PrimVar s Int
..} SerialisedKey
k Entry SerialisedValue BlobSpan
e =
  Bool
-> ST s (Maybe (RawPage, Maybe Chunk))
-> ST s (Maybe (RawPage, Maybe Chunk))
forall a. HasCallStack => Bool -> a -> a
assert (SerialisedKey -> Entry SerialisedValue BlobSpan -> Bool
forall b. SerialisedKey -> Entry SerialisedValue b -> Bool
PageAcc.entryWouldFitInPage SerialisedKey
k Entry SerialisedValue BlobSpan
e) (ST s (Maybe (RawPage, Maybe Chunk))
 -> ST s (Maybe (RawPage, Maybe Chunk)))
-> ST s (Maybe (RawPage, Maybe Chunk))
-> ST s (Maybe (RawPage, Maybe Chunk))
forall a b. (a -> b) -> a -> b
$ do
    PrimVar (PrimState (ST s)) Int -> (Int -> Int) -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> (a -> a) -> m ()
modifyPrimVar PrimVar s Int
PrimVar (PrimState (ST s)) Int
entryCount (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
    MBloom s SerialisedKey -> SerialisedKey -> ST s ()
forall (h :: * -> *) a s.
(Hashes h, Hashable a) =>
MBloom' s h a -> a -> ST s ()
MBloom.insert MBloom s SerialisedKey
mbloom SerialisedKey
k

    Bool
pageBoundaryNeeded <-
        -- Try adding the key/op to the page accumulator to see if it fits. If
        -- it does not fit, a page boundary is needed.
        Bool -> Bool
not (Bool -> Bool) -> ST s Bool -> ST s Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> PageAcc s
-> SerialisedKey -> Entry SerialisedValue BlobSpan -> ST s Bool
forall s.
PageAcc s
-> SerialisedKey -> Entry SerialisedValue BlobSpan -> ST s Bool
PageAcc.pageAccAddElem PageAcc s
mpageacc SerialisedKey
k Entry SerialisedValue BlobSpan
e

    if Bool
pageBoundaryNeeded
      then do
        -- We need a page boundary. If the current page is empty then we have
        -- a boundary already, otherwise we need to flush the current page.
        Maybe (RawPage, Maybe Chunk)
mpagemchunk <- RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
forall s. RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
flushPageIfNonEmpty RunAcc s
racc
        -- The current page is now empty, either because it was already empty
        -- or because we just flushed it. Adding the new key/op to an empty
        -- page must now succeed, because we know it fits in a page.
        Bool
added <- PageAcc s
-> SerialisedKey -> Entry SerialisedValue BlobSpan -> ST s Bool
forall s.
PageAcc s
-> SerialisedKey -> Entry SerialisedValue BlobSpan -> ST s Bool
PageAcc.pageAccAddElem PageAcc s
mpageacc SerialisedKey
k Entry SerialisedValue BlobSpan
e
        Bool
-> ST s (Maybe (RawPage, Maybe Chunk))
-> ST s (Maybe (RawPage, Maybe Chunk))
forall a. HasCallStack => Bool -> a -> a
assert Bool
added (ST s (Maybe (RawPage, Maybe Chunk))
 -> ST s (Maybe (RawPage, Maybe Chunk)))
-> ST s (Maybe (RawPage, Maybe Chunk))
-> ST s (Maybe (RawPage, Maybe Chunk))
forall a b. (a -> b) -> a -> b
$ Maybe (RawPage, Maybe Chunk) -> ST s (Maybe (RawPage, Maybe Chunk))
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (RawPage, Maybe Chunk)
mpagemchunk

      else Maybe (RawPage, Maybe Chunk) -> ST s (Maybe (RawPage, Maybe Chunk))
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (RawPage, Maybe Chunk)
forall a. Maybe a
Nothing

-- | Add a \"large\" key\/op pair with an optional blob span to the run
-- accumulator.
--
-- This version is /only/ for large entries that span multiple pages. Use
-- 'addSmallKeyOp' if the entry is smaller than a page. If this distinction
-- is not known at the use site, use 'PageAcc.entryWouldFitInPage' to determine
-- which case applies.
--
-- Note that this version expects the full large value to be in the given
-- 'Entry', not just the prefix of the value that fits into a single page.
-- For large multi-page values that are represented by a pre-serialised
-- 'RawPage' (as occurs when merging runs), use 'addLargeSerialisedKeyOp'.
--
-- This is guaranteed to add the key\/op. It will yield one or two 'RawPage's,
-- and one or more 'RawOverflowPage's. These pages should be written out to
-- the run's page file in that order, the 'RawPage's followed by the
-- 'RawOverflowPage's.
--
addLargeKeyOp ::
     RunAcc s
  -> SerialisedKey
  -> Entry SerialisedValue BlobSpan -- ^ the full value, not just a prefix
  -> ST s ([RawPage], [RawOverflowPage], [Chunk])
addLargeKeyOp :: forall s.
RunAcc s
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
addLargeKeyOp racc :: RunAcc s
racc@RunAcc{MBloom s SerialisedKey
PrimVar s Int
PageAcc s
IndexAcc s
mbloom :: forall s. RunAcc s -> MBloom s SerialisedKey
mindex :: forall s. RunAcc s -> IndexAcc s
mpageacc :: forall s. RunAcc s -> PageAcc s
entryCount :: forall s. RunAcc s -> PrimVar s Int
mbloom :: MBloom s SerialisedKey
mindex :: IndexAcc s
mpageacc :: PageAcc s
entryCount :: PrimVar s Int
..} SerialisedKey
k Entry SerialisedValue BlobSpan
e =
  Bool
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a. HasCallStack => Bool -> a -> a
assert (Bool -> Bool
not (SerialisedKey -> Entry SerialisedValue BlobSpan -> Bool
forall b. SerialisedKey -> Entry SerialisedValue b -> Bool
PageAcc.entryWouldFitInPage SerialisedKey
k Entry SerialisedValue BlobSpan
e)) (ST s ([RawPage], [RawOverflowPage], [Chunk])
 -> ST s ([RawPage], [RawOverflowPage], [Chunk]))
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a b. (a -> b) -> a -> b
$ do
    PrimVar (PrimState (ST s)) Int -> (Int -> Int) -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> (a -> a) -> m ()
modifyPrimVar PrimVar s Int
PrimVar (PrimState (ST s)) Int
entryCount (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
    MBloom s SerialisedKey -> SerialisedKey -> ST s ()
forall (h :: * -> *) a s.
(Hashes h, Hashable a) =>
MBloom' s h a -> a -> ST s ()
MBloom.insert MBloom s SerialisedKey
mbloom SerialisedKey
k

    -- If the existing page accumulator is non-empty, we flush it, since the
    -- new large key/op will need more than one page to itself.
    Maybe (RawPage, Maybe Chunk)
mpagemchunkPre <- RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
forall s. RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
flushPageIfNonEmpty RunAcc s
racc

    -- Make the new page and overflow pages. Add the span of pages to the index.
    let (RawPage
page, [RawOverflowPage]
overflowPages) = SerialisedKey
-> Entry SerialisedValue BlobSpan -> (RawPage, [RawOverflowPage])
PageAcc.singletonPage SerialisedKey
k Entry SerialisedValue BlobSpan
e
    [Chunk]
chunks <- (SerialisedKey, Word32) -> IndexAcc s -> ST s [Chunk]
forall s. (SerialisedKey, Word32) -> IndexAcc s -> ST s [Chunk]
Index.appendMulti (SerialisedKey
k, Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([RawOverflowPage] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [RawOverflowPage]
overflowPages)) IndexAcc s
mindex

    -- Combine the results with anything we flushed before
    let (![RawPage]
pages, ![Chunk]
chunks') = Maybe (RawPage, Maybe Chunk)
-> RawPage -> [Chunk] -> ([RawPage], [Chunk])
selectPagesAndChunks Maybe (RawPage, Maybe Chunk)
mpagemchunkPre RawPage
page [Chunk]
chunks
    ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ([RawPage]
pages, [RawOverflowPage]
overflowPages, [Chunk]
chunks')

-- | Add a \"large\" pre-serialised key\/op entry to the run accumulator.
--
-- This version is for large entries that span multiple pages and are
-- represented by already serialised 'RawPage' and one or more
-- 'RawOverflowPage's.
--
-- For this case, the caller provides the key, the raw page it is from and the
-- overflow pages. The raw page and overflow pages are returned along with any
-- other pages that need to be yielded (in order). The caller should write out
-- the pages to the run's page file in order: the returned 'RawPage's followed
-- by the 'RawOverflowPage's (the same as for 'addLargeKeyOp').
--
-- Note that this action is not appropriate for key\/op entries that would fit
-- within a page ('PageAcc.entryWouldFitInPage') but just /happen/ to have
-- ended up in a page on their own in an input to a merge. A page can end up
-- with a single entry because a page boundary was needed rather than because
-- the entry itself was too big. Furthermore, pre-serialised pages can only be
-- used unaltered if the entry does /not/ use a 'BlobSpan', since the 'BlobSpan'
-- typically needs to be modified. Thus the caller should use the following
-- tests to decide if 'addLargeSerialisedKeyOp' should be used:
--
-- 1. The entry does not use a 'BlobSpan'.
-- 2. The entry definitely overflows onto one or more overflow pages.
--
-- Otherwise, use 'addLargeKeyOp' or 'addSmallKeyOp' as appropriate.
--
addLargeSerialisedKeyOp ::
     RunAcc s
  -> SerialisedKey     -- ^ The key
  -> RawPage           -- ^ The page that this key\/op is in, which must be the
                       -- first page of a multi-page representation of a single
                       -- key\/op /without/ a 'BlobSpan'.
  -> [RawOverflowPage] -- ^ The overflow pages for this key\/op
  -> ST s ([RawPage], [RawOverflowPage], [Chunk])
addLargeSerialisedKeyOp :: forall s.
RunAcc s
-> SerialisedKey
-> RawPage
-> [RawOverflowPage]
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
addLargeSerialisedKeyOp racc :: RunAcc s
racc@RunAcc{MBloom s SerialisedKey
PrimVar s Int
PageAcc s
IndexAcc s
mbloom :: forall s. RunAcc s -> MBloom s SerialisedKey
mindex :: forall s. RunAcc s -> IndexAcc s
mpageacc :: forall s. RunAcc s -> PageAcc s
entryCount :: forall s. RunAcc s -> PrimVar s Int
mbloom :: MBloom s SerialisedKey
mindex :: IndexAcc s
mpageacc :: PageAcc s
entryCount :: PrimVar s Int
..} SerialisedKey
k RawPage
page [RawOverflowPage]
overflowPages =
  Bool
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a. HasCallStack => Bool -> a -> a
assert (RawPage -> Word16
RawPage.rawPageNumKeys RawPage
page Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
1) (ST s ([RawPage], [RawOverflowPage], [Chunk])
 -> ST s ([RawPage], [RawOverflowPage], [Chunk]))
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a b. (a -> b) -> a -> b
$
  Bool
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a. HasCallStack => Bool -> a -> a
assert (RawPage -> Int -> Word64
RawPage.rawPageHasBlobSpanAt RawPage
page Int
0 Word64 -> Word64 -> Bool
forall a. Eq a => a -> a -> Bool
== Word64
0) (ST s ([RawPage], [RawOverflowPage], [Chunk])
 -> ST s ([RawPage], [RawOverflowPage], [Chunk]))
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a b. (a -> b) -> a -> b
$
  Bool
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a. HasCallStack => Bool -> a -> a
assert (RawPage -> Int
RawPage.rawPageOverflowPages RawPage
page Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (ST s ([RawPage], [RawOverflowPage], [Chunk])
 -> ST s ([RawPage], [RawOverflowPage], [Chunk]))
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a b. (a -> b) -> a -> b
$
  Bool
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a. HasCallStack => Bool -> a -> a
assert (RawPage -> Int
RawPage.rawPageOverflowPages RawPage
page Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== [RawOverflowPage] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [RawOverflowPage]
overflowPages) (ST s ([RawPage], [RawOverflowPage], [Chunk])
 -> ST s ([RawPage], [RawOverflowPage], [Chunk]))
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a b. (a -> b) -> a -> b
$ do
    PrimVar (PrimState (ST s)) Int -> (Int -> Int) -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> (a -> a) -> m ()
modifyPrimVar PrimVar s Int
PrimVar (PrimState (ST s)) Int
entryCount (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
    MBloom s SerialisedKey -> SerialisedKey -> ST s ()
forall (h :: * -> *) a s.
(Hashes h, Hashable a) =>
MBloom' s h a -> a -> ST s ()
MBloom.insert MBloom s SerialisedKey
mbloom SerialisedKey
k

    -- If the existing page accumulator is non-empty, we flush it, since the
    -- new large key/op will need more than one page to itself.
    Maybe (RawPage, Maybe Chunk)
mpagemchunkPre <- RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
forall s. RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
flushPageIfNonEmpty RunAcc s
racc
    let nOverflowPages :: Int
nOverflowPages = [RawOverflowPage] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [RawOverflowPage]
overflowPages --TODO: consider using vector
    [Chunk]
chunks <- (SerialisedKey, Word32) -> IndexAcc s -> ST s [Chunk]
forall s. (SerialisedKey, Word32) -> IndexAcc s -> ST s [Chunk]
Index.appendMulti (SerialisedKey
k, Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
nOverflowPages) IndexAcc s
mindex
    let (![RawPage]
pages, ![Chunk]
chunks') = Maybe (RawPage, Maybe Chunk)
-> RawPage -> [Chunk] -> ([RawPage], [Chunk])
selectPagesAndChunks Maybe (RawPage, Maybe Chunk)
mpagemchunkPre RawPage
page [Chunk]
chunks
    ([RawPage], [RawOverflowPage], [Chunk])
-> ST s ([RawPage], [RawOverflowPage], [Chunk])
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ([RawPage]
pages, [RawOverflowPage]
overflowPages, [Chunk]
chunks')

-- | Internal helper: finalise the current page, add the page to the index,
-- reset the page accumulator and return the serialised 'RawPage' along with
-- any index chunk.
--
-- Returns @Nothing@ if the page accumulator was empty.
--
flushPageIfNonEmpty :: RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
flushPageIfNonEmpty :: forall s. RunAcc s -> ST s (Maybe (RawPage, Maybe Chunk))
flushPageIfNonEmpty RunAcc{PageAcc s
mpageacc :: forall s. RunAcc s -> PageAcc s
mpageacc :: PageAcc s
mpageacc, IndexAcc s
mindex :: forall s. RunAcc s -> IndexAcc s
mindex :: IndexAcc s
mindex} = do
    Int
nkeys <- PageAcc s -> ST s Int
forall s. PageAcc s -> ST s Int
PageAcc.keysCountPageAcc PageAcc s
mpageacc
    if Int
nkeys Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
      then do
        -- Grab the min and max keys, and add the page to the index.
        SerialisedKey
minKey <- PageAcc s -> Int -> ST s SerialisedKey
forall s. PageAcc s -> Int -> ST s SerialisedKey
PageAcc.indexKeyPageAcc PageAcc s
mpageacc Int
0
        SerialisedKey
maxKey <- PageAcc s -> Int -> ST s SerialisedKey
forall s. PageAcc s -> Int -> ST s SerialisedKey
PageAcc.indexKeyPageAcc PageAcc s
mpageacc (Int
nkeysInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
        Maybe Chunk
mchunk <- (SerialisedKey, SerialisedKey) -> IndexAcc s -> ST s (Maybe Chunk)
forall s.
(SerialisedKey, SerialisedKey) -> IndexAcc s -> ST s (Maybe Chunk)
Index.appendSingle (SerialisedKey
minKey, SerialisedKey
maxKey) IndexAcc s
mindex

        -- Now serialise the page and reset the accumulator
        RawPage
page <- PageAcc s -> ST s RawPage
forall s. PageAcc s -> ST s RawPage
PageAcc.serialisePageAcc PageAcc s
mpageacc
        PageAcc s -> ST s ()
forall s. PageAcc s -> ST s ()
PageAcc.resetPageAcc PageAcc s
mpageacc
        Maybe (RawPage, Maybe Chunk) -> ST s (Maybe (RawPage, Maybe Chunk))
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return ((RawPage, Maybe Chunk) -> Maybe (RawPage, Maybe Chunk)
forall a. a -> Maybe a
Just (RawPage
page, Maybe Chunk
mchunk))

      else Maybe (RawPage, Maybe Chunk) -> ST s (Maybe (RawPage, Maybe Chunk))
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (RawPage, Maybe Chunk)
forall a. Maybe a
Nothing

-- | Internal helper for 'addLargeKeyOp' and 'addLargeSerialisedKeyOp'.
-- Combine the result of 'flushPageIfNonEmpty' with extra pages and index
-- chunks.
--
selectPagesAndChunks :: Maybe (RawPage, Maybe Chunk)
                     -> RawPage
                     -> [Chunk]
                     -> ([RawPage], [Chunk])
selectPagesAndChunks :: Maybe (RawPage, Maybe Chunk)
-> RawPage -> [Chunk] -> ([RawPage], [Chunk])
selectPagesAndChunks Maybe (RawPage, Maybe Chunk)
mpagemchunkPre RawPage
page [Chunk]
chunks =
  case Maybe (RawPage, Maybe Chunk)
mpagemchunkPre of
    Maybe (RawPage, Maybe Chunk)
Nothing                       -> (         [RawPage
page],          [Chunk]
chunks)
    Just (RawPage
pagePre, Maybe Chunk
Nothing)       -> ([RawPage
pagePre, RawPage
page],          [Chunk]
chunks)
    Just (RawPage
pagePre, Just Chunk
chunkPre) -> ([RawPage
pagePre, RawPage
page], Chunk
chunkPreChunk -> [Chunk] -> [Chunk]
forall a. a -> [a] -> [a]
:[Chunk]
chunks)

{-------------------------------------------------------------------------------
  Bloom filter allocation
-------------------------------------------------------------------------------}

-- | See 'Database.LSMTree.Internal.Config.BloomFilterAlloc'
data RunBloomFilterAlloc =
    -- | Bits per element in a filter
    RunAllocFixed !Word64
  | RunAllocRequestFPR !Double
  deriving stock (Int -> RunBloomFilterAlloc -> ShowS
[RunBloomFilterAlloc] -> ShowS
RunBloomFilterAlloc -> [Char]
(Int -> RunBloomFilterAlloc -> ShowS)
-> (RunBloomFilterAlloc -> [Char])
-> ([RunBloomFilterAlloc] -> ShowS)
-> Show RunBloomFilterAlloc
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RunBloomFilterAlloc -> ShowS
showsPrec :: Int -> RunBloomFilterAlloc -> ShowS
$cshow :: RunBloomFilterAlloc -> [Char]
show :: RunBloomFilterAlloc -> [Char]
$cshowList :: [RunBloomFilterAlloc] -> ShowS
showList :: [RunBloomFilterAlloc] -> ShowS
Show, RunBloomFilterAlloc -> RunBloomFilterAlloc -> Bool
(RunBloomFilterAlloc -> RunBloomFilterAlloc -> Bool)
-> (RunBloomFilterAlloc -> RunBloomFilterAlloc -> Bool)
-> Eq RunBloomFilterAlloc
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RunBloomFilterAlloc -> RunBloomFilterAlloc -> Bool
== :: RunBloomFilterAlloc -> RunBloomFilterAlloc -> Bool
$c/= :: RunBloomFilterAlloc -> RunBloomFilterAlloc -> Bool
/= :: RunBloomFilterAlloc -> RunBloomFilterAlloc -> Bool
Eq)

instance NFData RunBloomFilterAlloc where
    rnf :: RunBloomFilterAlloc -> ()
rnf (RunAllocFixed Word64
a)      = Word64 -> ()
forall a. NFData a => a -> ()
rnf Word64
a
    rnf (RunAllocRequestFPR Double
a) = Double -> ()
forall a. NFData a => a -> ()
rnf Double
a

newMBloom :: NumEntries -> RunBloomFilterAlloc -> ST s (MBloom s a)
newMBloom :: forall s a. NumEntries -> RunBloomFilterAlloc -> ST s (MBloom s a)
newMBloom (NumEntries Int
nentries) = \case
      RunAllocFixed !Word64
bitsPerEntry    ->
        let !nbits :: Integer
nbits = Word64 -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
bitsPerEntry Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
* Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
nentries
        in  Int -> Word64 -> ST s (MBloom s a)
forall s (h :: * -> *) a. Int -> Word64 -> ST s (MBloom' s h a)
MBloom.new
              (Integer -> Int
forall a b.
(HasCallStack, Integral a, Integral b, Show a) =>
a -> b
fromIntegralChecked (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ Integer -> Integer -> Integer
numHashFunctions Integer
nbits (Int -> Integer
forall a b.
(HasCallStack, Integral a, Integral b, Show a) =>
a -> b
fromIntegralChecked Int
nentries))
              (Integer -> Word64
forall a b.
(HasCallStack, Integral a, Integral b, Show a) =>
a -> b
fromIntegralChecked Integer
nbits)
      RunAllocRequestFPR !Double
fpr ->
        Double -> Int -> ST s (MBloom s a)
forall s a. Double -> Int -> ST s (MBloom s a)
Bloom.Easy.easyNew Double
fpr Int
nentries

-- | Computes the optimal number of hash functions that minimises the false
-- positive rate for a bloom filter.
--
-- See Niv Dayan, Manos Athanassoulis, Stratos Idreos,
-- /Optimal Bloom Filters and Adaptive Merging for LSM-Trees/,
-- Footnote 2, page 6.
numHashFunctions ::
     Integer -- ^ Number of bits assigned to the bloom filter.
  -> Integer -- ^ Number of entries inserted into the bloom filter.
  -> Integer
numHashFunctions :: Integer -> Integer -> Integer
numHashFunctions Integer
nbits Integer
nentries = forall a b. (RealFrac a, Integral b) => a -> b
truncate @Double (Double -> Integer) -> Double -> Integer
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double
forall a. Ord a => a -> a -> a
max Double
1 (Double -> Double) -> Double -> Double
forall a b. (a -> b) -> a -> b
$
    (Integer -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
nbits Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Integer -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
nentries) Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double -> Double
forall a. Floating a => a -> a
log Double
2

-- | False positive rate
--
-- Assumes that the bloom filter uses 'numHashFunctions' hash functions.
--
-- See Niv Dayan, Manos Athanassoulis, Stratos Idreos,
-- /Optimal Bloom Filters and Adaptive Merging for LSM-Trees/,
-- Equation 2.
falsePositiveRate ::
       Floating a
    => a  -- ^ entries
    -> a  -- ^ bits
    -> a
falsePositiveRate :: forall a. Floating a => a -> a -> a
falsePositiveRate a
entries a
bits = a -> a
forall a. Floating a => a -> a
exp ((-(a
bits a -> a -> a
forall a. Fractional a => a -> a -> a
/ a
entries)) a -> a -> a
forall a. Num a => a -> a -> a
* a -> a
forall a. Num a => a -> a
sq (a -> a
forall a. Floating a => a -> a
log a
2))

sq :: Num a => a -> a
sq :: forall a. Num a => a -> a
sq a
x = a
x a -> a -> a
forall a. Num a => a -> a -> a
* a
x