{-# OPTIONS_HADDOCK not-home #-}

-- | A compact fence-pointer index for uniformly distributed keys.
--
-- Keys used with a compact index must be at least 8 bytes long.
--
-- TODO: add utility functions for clash probability calculations
--
module Database.LSMTree.Internal.Index.Compact (
    -- $compact
    IndexCompact (..)
    -- * Queries
  , search
  , sizeInPages
  , countClashes
  , hasClashes
    -- * Non-incremental serialisation
  , toLBS
    -- * Incremental serialisation
  , headerLBS
  , finalLBS
  , word64VectorToChunk
    -- * Deserialisation
  , fromSBS
  ) where

import           Control.DeepSeq (NFData (..))
import           Control.Monad (when)
import           Control.Monad.ST
import           Data.Bit hiding (flipBit)
import           Data.Bits (unsafeShiftR, (.&.))
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Builder.Extra as BB
import qualified Data.ByteString.Lazy as LBS
import           Data.ByteString.Short (ShortByteString (..))
import           Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import           Data.Maybe (fromMaybe)
import           Data.Primitive.ByteArray (ByteArray (..), indexByteArray,
                     sizeofByteArray)
import           Data.Primitive.Types (sizeOf)
import qualified Data.Vector.Algorithms.Search as VA
import qualified Data.Vector.Generic as VG
import qualified Data.Vector.Primitive as VP
import qualified Data.Vector.Unboxed as VU
import qualified Data.Vector.Unboxed.Base as VU
import           Data.Word
import           Database.LSMTree.Internal.BitMath
import           Database.LSMTree.Internal.ByteString (byteArrayFromTo)
import           Database.LSMTree.Internal.Chunk (Chunk (Chunk))
import qualified Database.LSMTree.Internal.Chunk as Chunk (toByteString)
import           Database.LSMTree.Internal.Entry (NumEntries (..))
import           Database.LSMTree.Internal.Map.Range (Bound (..))
import           Database.LSMTree.Internal.Page
import           Database.LSMTree.Internal.Serialise
import           Database.LSMTree.Internal.Unsliced
import           Database.LSMTree.Internal.Vector

{- $compact

  A fence-pointer index is a mapping of disk pages, identified by some number
  @i@, to min-max information for keys on that page.

  Fence-pointer indexes can be constructed and serialised incrementally, see
  module "Database.LSMTree.Internal.Index.CompactAcc".

  Given a serialised target key @k@, an index can be 'search'ed to find a disk
  page @i@ that /might/ contain @k@. Fence-pointer indices offer no guarantee of
  whether the page contains the key, but the indices do guarantee that no page
  other than page @i@ could store @k@.

  === Intro

  A run is a file that stores a collection of key-value pairs. The data is
  sorted by key, and the data is divided into disk pages. A fence pointer index
  is a mapping of each disk page to min\/max information for keys on that page.
  As such, the index stores the keys that mark the /boundaries/ for each page. A
  lookup of a key in a run first searches the fence pointer index to find the
  page that contains the relevant key range, before doing a single I/O to
  retrieve the page. The guarantee of an index search is this:

  GUARANTEE: /if/ the key is somewhere in the run, then it can only be in the
  page that is returned by the index search.

  This module presents a /compact/ implementation of a fence pointer index.
  However, we first consider a simple implementation of a fence pointer index.
  After that, we show how we can save on the memory size of the index
  representation if the keys are uniformly distributed, like hashes. Let's start
  with some basic definitions:

  > type Run k v = -- elided
  > type Page k v = -- elided
  > minKey :: Page k v -> k
  > maxKey :: Page k v - k
  > type PageNo = Int

  The first implementation one may come up with is a vector containing the
  minimum key and maximum key on each page. As such, the index stores the
  key-interval @[minKey p, maxKey p]@ for each page @p@. @search1@ searches the
  vector for the key-interval that contains the search key, if it exists, and
  returns the corresponding vector index as a page number. A design choice of
  ours is that the search will __always__ return a page number, even if the
  index could answer that the key is definitely not in a page (see
  'indexSearches').

  > type Index1 k = V.Vector (k, k)
  >
  > mkIndex1 :: Run k v -> Index1 k
  > mkIndex1 = V.fromList . fmap (\p -> (minKey p, maxKey p))
  >
  > search1 :: k -> Index1 k -> PageNo
  > search1 = -- elided

  We can reduce the memory size of `Index1` by half if we store only the minimum
  keys on each page. As such, the index now stores the key-interval @[minKey p,
  minKey p')@ for each page @p@ and successor page @p'@. GUARANTEE is still
  guaranteed, because the old intervals are strictly contained in the new ones.
  @search2@ searches the vector for the largest key smaller or equal to the
  given one, if it exists, and returns the corresponding vector index as a page
  number.

  > type Index2 k = V.Vector k
  >
  > mkIndex2 :: Run k v -> Index2 k
  > mkIndex2 = V.fromList . fmap minKey
  >
  > search2 :: k -> Index k -> PageNo
  > search2 = -- elided

  Now on to creating a more compact representation, which relies on a property
  of the keys that are used: the keys must be uniformly distributed values, like
  hashes.

  === Compact representation

  As mentioned before, we can save on the memory size of the index
  representation if the keys are uniformly distributed, like hashes. From now
  on, we will just assume that our keys are hashes, which can be viewed as
  strings of bits.

  The intuition behind the compact index is this: often, we don't need to look
  at full bit-strings to compare independently generated hashes against
  each other. The probability of the @n@ most significant bits of two
  independently generated hashes matching is @(1/(2^n))@, and if we pick @n@
  large enough then we can expect a very small number of collisions. More
  generally, the expected number of @n@-bit hashes that have to be generated
  before a collision is observed is @2^(n/2)@, see [the birthday
  problem](https://en.wikipedia.org/wiki/Birthday_problem#Probability_of_a_shared_birthday_(collision).
  Or, phrased differently, if we store only the @n@ most significant bits of
  independently generated hashes in our index, then we can store up to @2^(n/2)@
  of those hashes before the expected number of collisions becomes one. Still,
  we need a way to break ties if there are collisions, because the probability
  of collisions is non-zero.

  ==== Clashes

  In our previous incarnations of the "simple" index, we relied on the minimum
  keys to define the boundaries between pages, or more precisely, the pages'
  key-intervals. If we look only at a subset of the most significant bits, that
  is no longer true in the presence of collisions. Let's illustrate this using
  an example of two contiguous pages @pi@ and @pj@:

  > minKey pi == "00000000"
  > maxKey pi == "10000000"
  > minKey pj == "10000001"
  > maxKey pj == "11111111"

  Say we store only the 4 most significant bits (left-to-right) in our compact
  index. This means that those 4 bits of @maxKey pi@ and @minKey pj@ collide.
  The problem is that the interval @[minKey pi, maxKey pi]@ strictly contains
  the interval @[minKey pi, minKey pj)@! The first 4 bits of @minKey pj@ do not
  actually define the boundary between @pi@ and @pj@, and so there would be no
  way to answer if a search key with first 4 bits "1000" could be found in @pi@
  or @pj@. We call such a situation a /clash/,

  The solution in these cases is to: (i) record that a clash occurred between
  pages @pi@ and @pj@, and (ii) store the full key @maxKey pi@ separately. The
  record of clashes can be implemented simply as a single bit per page: with
  a @True@ bit meaning a clash with the previous page. Note therefore that the
  first page's bit is always going to be @False@. We store the full keys using
  a 'Map'. It is ok that this is not a compact representation, because we
  expect to store full keys for only a very small number of pages.

  The example below shows a simplified view of the compact index implementation
  so far. As an example, we store the @64@ most significant bits of each minimum
  key in the @primary@ index, the record of clashes is called @clashes@, and the
  @IntMap@ is named the @tieBreaker@ map. @search3@ can at any point during the
  search, consult @clashes@ and @tieBreaker@ to /break ties/.

  > --              (primary         , clashes      , tieBreaker)
  > type Index3 k = (VU.Vector Word64, VU.Vector Bit, IntMap k  )
  >
  > mkIndex3 :: Run k v -> Index3
  > mkIndex3 = -- elided
  >
  > search3 :: k -> Index3 -> PageNo
  > search3 = -- elided

  Let's compare the memory size of @Index2@ with @Index3@. Say we have \(n\)
  pages, and keys that are \(k\) bits each, then the memory size of @Index2@ is
  \(O(n~k)\) bits. Alternatively, for the same \(n\) pages, if we store only the
  \(c\) most significant bits, then the memory size of @Index3@ is
  \(O\left(n~c + n + k E\left[\text{collision}~n~c\right]\right)\) bits, where
  the last summand is the expected number of collisions for \(n\) independently
  generated hashes of bit-size \(c\). Precisely, the expected number of
  collisions is \(\frac{n^2 - n}{2 \times 2^c}\), so we can simplify the memory
  size to \(O\left(n~c + n + k \frac{n^2}{2^c}\right)\).

  So, @Index3@ is not strictly an improvement over @Index2@ if we look at memory
  complexity alone. However, /in practice/ @Index3@ is an improvement over
  @Index2@ if we can pick an \(c\) that is (i) much smaller than \(k\) and (ii)
  keeps \( \text{E}\left[\text{collision}~n~c\right] \) small (e.g. close to 1).
  For example, storing the first 64 bits of 100 million SHA256 hashes reduces
  memory size from \(256 n\) bits to \(64 n + n\) bits, because the expected
  number of collisions is smaller than 1.

  === Representing clashes and larger-than-page entries

  A complicating factor is the need to represent larger-than-page entries in
  the compact index. This need arises from the fact that we can have single
  key/value pairs that need more than one disk page to represent.

  One strategy would be to read the first page, discover that it is a
  multi-page entry and then read the subsequent pages. This would however
  double the disk I\/O latency and complicate the I\/O handling logic (which is
  non-trivial due to it being asynchronous).

  The strategy we use is to have the index be able to return the range of pages
  that need to be read, and then all pages can be read in one batch. This means
  the index must return not just individual page numbers but intervals, and
  with a representation capable of doing so. This is not implausible since we
  have an entry in the primary index for every disk page, and so we have
  entries both for the first and subsequent pages of a larger-than-page entry.
  The natural thing to do is have each of these subsequent primary index
  entries contain the same key prefix value. This means a binary search will
  find the /last/ entry in a run of equal prefix values.

  What leads to complexity is that we will /also/ get runs of equal values if
  we have clashes between pages (as discussed above). So in the general case
  we may have a run of equal values made up of a mixture of clashes and
  larger-than-page entries.

  So the general situation is that after a binary search we have found the
  end of what may turn out to be a run of clashes and larger-than-page values
  and we must disambigutate and return the appropriate single page (for the
  ordinary case) or an interval of pages (for the LTP case).

  To disambigutate we make use of the clash bits, and we make the choice to
  say that /all/ the entries for a LTP have their clash bit set, irrespective
  of whether the LTP is in fact involved in a clash. This may seem
  counter-intuitive but it leads to a simpler mathematical definition (below).

  The search algorithm involves searching backwards in the clash bits to find
  the beginning of the run of entries that are involved. To establish which
  entry within the run is the right one to return, we can consult the tie
  breaker map by looking for the biggest entry that is less than or equal to
  the full search key. This may then point to an index within the run of
  clashing entries, in which case this is the right entry, but it may also
  point to an earlier and thus irrelevant entry, in which case the first entry
  in the run is the right one.

  Note that this second case also covers the case of a single non-clashing LTP.

  Finally, to determine if the selected entry is an LTP and if so what interval
  of pages to return, we make use of a second bit vector of LTP \"overflow\"
  pages. This bit vector has 1 values for LTP overflow pages (i.e. the 2nd and
  subsequent pages) and 0 otherwise. We can then simply search forwards to find
  the length of the LTP (or 1 if it is not an LTP).

  === A semi-formal description of the compact index #rep-descr#

  * \(n\) is the number of pages
  * \(ps = \{p_i \mid 0 \leq i < n \}\) is a sorted set of pages
  * \(p^{min}_i\) is the full minimum key on a page \(p_i \in ps\).
  * \(p^{max}_i\) is the full maximum key on a page \(p_i \in ps\).
  * \(\texttt{topBits64}(k)\) extracts the \(64\) most significant bits from
    \(k\). We call these \(64\) bits the primary bits.
  * \(i \in \left[0, n \right)\), unless stated otherwise

  \[
  \begin{align*}
    P    :&~ \texttt{Array PageNo Word64} \\
    P[i] =&~ \texttt{topBits64}(p^{min}_i) \\
    \\
    C    :&~ \texttt{Array PageNo Bit} \\
    C[0] =&~ \texttt{false} \\
    C[i] =&~ \texttt{topBits64}(p^{max}_{i-1}) ~\texttt{==}~ \texttt{topBits64}(p^{min}_i) \\
    \\
    TB            :&~ \texttt{Map Key PageNo} \\
    TB(p^{min}_i) =&~
      \begin{cases}
        p^{min}_i        &, \text{if}~ C[i] \land \neg LTP[i] \\
        \text{undefined} &, \text{otherwise} \\
      \end{cases} \\
    \\
    LTP    :&~ \texttt{Array PageNo Bit} \\
    LTP[0] =&~ \texttt{false} \\
    LTP[i] =&~ p^{min}_{i-1} ~\texttt{==}~ p^{min}_i \\
  \end{align*}
  \]

  === An informal description of the search algorithm #search-descr#

  The easiest way to think about the search algorithm is that we start with the
  full interval of page numbers, and shrink it until we get to an interval that
  contains only a single page (or in case of a larger-than-page value, multiple
  pages that have the same minimum key). Assume @k@ is our search key.

  * Search \(P\) for the vector index @i@ that maps to the largest prim-bits
    value that is smaller or equal to the primary bits of @k@.
  * Check \(C\) if the page corresponding to @i@ is part of a
    clash, if it is not, then we are done!
  * If there is a clash, we go into clash recovery mode. This means we have to
    resolve the ambiguous page boundary using \(TB\). Note, however, that if
    there are multiple clashes in a row, there could be multiple ambiguous page
    boundaries that have to be resolved. We can achieve this using a three-step
    process:

      * Search \(TB\) for the vector index @j1@ that maps to the largest full
        key that is smaller than or equal to @k@.
      * Do a linear search backwards through \(C\) starting from @i@ to find the
        first page @j2@ that is not part of a clash.
      * Take the maximum of @j1@ or @j2@. Consider the two cases where either
        @j1@ or @j2@ is largest (@j1@ can not be equal to @j2@):

          * @j1@ is largest: we resolved ambiguous page boundaries "to the left"
            (toward lower vector indexes) until we resolved an ambiguous page
            boundary "to the right" (toward the current vector index).
          * @j2@ is largest: we resolved ambiguous page boundaries only "to the
            left", and ended up on a page that doesn't have a clash.

  * For larger-than-page values, the steps up until now would only provide us
    with the page where the larger-than-page value starts. We use \(LTP\) to do
    a linear search to find the page where the larger-than-page value ends.

  Convince yourself that clash recovery works without any larger-than-page
  values, and then consider the case where the index does contain
  larger-than-page values. Hints:

  \[
    \begin{align*}
    LTP[i] &~ \implies C[i] \\
    LTP[i] &~ \implies TB(p^{min}_i) = \text{undefined} \\
    \end{align*}
  \]
-}

-- | A compact fence-pointer index for uniformly distributed keys.
--
-- Compact indexes save space by storing bit-prefixes of keys.
--
-- See [a semi-formal description of the compact index](#rep-descr) for more
-- details about the representation.
--
-- While the semi-formal description mentions the number of pages \(n\),
-- we do not store it, as it can be inferred from the length of 'icPrimary'.
data IndexCompact = IndexCompact {
    -- | \(P\): Maps a page @i@ to the 64-bit slice of primary bits of its
    -- minimum key.
    IndexCompact -> Vector Word64
icPrimary        :: {-# UNPACK #-} !(VU.Vector Word64)
    -- | \(C\): A clash on page @i@ means that the primary bits of the minimum
    -- key on that page aren't sufficient to decide whether a search for a key
    -- should continue left or right of the page.
  , IndexCompact -> Vector Bit
icClashes        :: {-# UNPACK #-} !(VU.Vector Bit)
    -- | \(TB\): Maps a full minimum key to the page @i@ that contains it, but
    -- only if there is a clash on page @i@.
  , IndexCompact -> Map (Unsliced SerialisedKey) PageNo
icTieBreaker     :: !(Map (Unsliced SerialisedKey) PageNo)
    -- | \(LTP\): Record of larger-than-page values. Given a span of pages for
    -- the larger-than-page value, the first page will map to 'False', and the
    -- remainder of the pages will be set to 'True'. Regular pages default to
    -- 'False'.
  , IndexCompact -> Vector Bit
icLargerThanPage :: {-# UNPACK #-} !(VU.Vector Bit)
  }
  deriving stock (Int -> IndexCompact -> ShowS
[IndexCompact] -> ShowS
IndexCompact -> String
(Int -> IndexCompact -> ShowS)
-> (IndexCompact -> String)
-> ([IndexCompact] -> ShowS)
-> Show IndexCompact
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> IndexCompact -> ShowS
showsPrec :: Int -> IndexCompact -> ShowS
$cshow :: IndexCompact -> String
show :: IndexCompact -> String
$cshowList :: [IndexCompact] -> ShowS
showList :: [IndexCompact] -> ShowS
Show, IndexCompact -> IndexCompact -> Bool
(IndexCompact -> IndexCompact -> Bool)
-> (IndexCompact -> IndexCompact -> Bool) -> Eq IndexCompact
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: IndexCompact -> IndexCompact -> Bool
== :: IndexCompact -> IndexCompact -> Bool
$c/= :: IndexCompact -> IndexCompact -> Bool
/= :: IndexCompact -> IndexCompact -> Bool
Eq)

instance NFData IndexCompact where
  rnf :: IndexCompact -> ()
rnf IndexCompact
ic = Vector Word64 -> ()
forall a. NFData a => a -> ()
rnf Vector Word64
a () -> () -> ()
forall a b. a -> b -> b
`seq` Vector Bit -> ()
forall a. NFData a => a -> ()
rnf Vector Bit
b () -> () -> ()
forall a b. a -> b -> b
`seq` Map (Unsliced SerialisedKey) PageNo -> ()
forall a. NFData a => a -> ()
rnf Map (Unsliced SerialisedKey) PageNo
c () -> () -> ()
forall a b. a -> b -> b
`seq` Vector Bit -> ()
forall a. NFData a => a -> ()
rnf Vector Bit
d
    where IndexCompact Vector Word64
a Vector Bit
b Map (Unsliced SerialisedKey) PageNo
c Vector Bit
d = IndexCompact
ic

{-------------------------------------------------------------------------------
  Queries
-------------------------------------------------------------------------------}

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.search').

    See [an informal description of the search algorithm](#search-descr) for
    more details about the search algorithm.
-}
search :: SerialisedKey -> IndexCompact -> PageSpan
-- One way to think of the search algorithm is that it starts with the full page
-- number interval, and shrinks it to a minimal interval that contains the
-- search key. The code below is annotated with [x,y] or [x, y) comments that
-- describe the known page number interval at that point in the search
-- algorithm.
search :: SerialisedKey -> IndexCompact -> PageSpan
search SerialisedKey
k IndexCompact{Vector Word64
Vector Bit
Map (Unsliced SerialisedKey) PageNo
icPrimary :: IndexCompact -> Vector Word64
icClashes :: IndexCompact -> Vector Bit
icTieBreaker :: IndexCompact -> Map (Unsliced SerialisedKey) PageNo
icLargerThanPage :: IndexCompact -> Vector Bit
icPrimary :: Vector Word64
icClashes :: Vector Bit
icTieBreaker :: Map (Unsliced SerialisedKey) PageNo
icLargerThanPage :: Vector Bit
..} =
    let !primbits :: Word64
primbits = SerialisedKey -> Word64
keyTopBits64 SerialisedKey
k in
    -- [0, n), where n is the length of the P array
    case Word64 -> Vector Word64 -> Maybe Int
forall (v :: * -> *) e.
(Vector v e, Ord e) =>
e -> v e -> Maybe Int
unsafeSearchLE Word64
primbits Vector Word64
icPrimary of
      Maybe Int
Nothing ->
        -- TODO: if the P array is indeed empty, then this violates the
        -- guarantee that we return a valid page span! We should specify that a
        -- compact index should be non-empty.
        if Vector Bit -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Bit
icLargerThanPage Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then PageNo -> PageSpan
singlePage (Int -> PageNo
PageNo Int
0) else
        -- [0, n), our page span definitely starts at 0, but we still have to
        -- consult the LTP array to check whether the value on page 0 overflows
        -- into subsequent pages.
        let !i :: Int
i = Bound Int -> Bound Int -> Bit -> Vector Bit -> Int
bitLongestPrefixFromTo (Int -> Bound Int
forall k. k -> Bound k
BoundExclusive Int
0) Bound Int
forall k. Bound k
NoBound (Bool -> Bit
Bit Bool
True) Vector Bit
icLargerThanPage
        -- [0, i]
        in  PageNo -> PageNo -> PageSpan
multiPage (Int -> PageNo
PageNo Int
0) (Int -> PageNo
PageNo Int
i)
      Just !Int
i ->
        -- [0, i]
        if Bit -> Bool
unBit (Bit -> Bool) -> Bit -> Bool
forall a b. (a -> b) -> a -> b
$ Vector Bit
icClashes Vector Bit -> Int -> Bit
forall a. Unbox a => Vector a -> Int -> a
VU.! Int
i then
          -- [0, i], now in clash recovery mode.
          let -- i is the *last* index in a range of contiguous pages that all
              -- clash. Since i is the end of the range, we search backwards
              -- through the C array to find the start of this range.
              !i1 :: PageNo
i1 = Int -> PageNo
PageNo (Int -> PageNo) -> Int -> PageNo
forall a b. (a -> b) -> a -> b
$ Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
0 (Maybe Int -> Int) -> Maybe Int -> Int
forall a b. (a -> b) -> a -> b
$
                Bound Int -> Bound Int -> Bit -> Vector Bit -> Maybe Int
bitIndexFromToRev (Int -> Bound Int
forall k. k -> Bound k
BoundInclusive Int
0) (Int -> Bound Int
forall k. k -> Bound k
BoundInclusive Int
i) (Bool -> Bit
Bit Bool
False) Vector Bit
icClashes
              -- The TB map is consulted to find the closest key smaller than k.
              !i2 :: PageNo
i2 = PageNo
-> ((Unsliced SerialisedKey, PageNo) -> PageNo)
-> Maybe (Unsliced SerialisedKey, PageNo)
-> PageNo
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> PageNo
PageNo Int
0) (Unsliced SerialisedKey, PageNo) -> PageNo
forall a b. (a, b) -> b
snd (Maybe (Unsliced SerialisedKey, PageNo) -> PageNo)
-> Maybe (Unsliced SerialisedKey, PageNo) -> PageNo
forall a b. (a -> b) -> a -> b
$
                Unsliced SerialisedKey
-> Map (Unsliced SerialisedKey) PageNo
-> Maybe (Unsliced SerialisedKey, PageNo)
forall k v. Ord k => k -> Map k v -> Maybe (k, v)
Map.lookupLE (SerialisedKey -> Unsliced SerialisedKey
makeUnslicedKey SerialisedKey
k) Map (Unsliced SerialisedKey) PageNo
icTieBreaker
              -- If i2 < i1, then it means the clashing pages were all just part
              -- of the same larger-than-page value. Entries are only included
              -- in the TB map if the clash was a *proper* clash.
              --
              -- If i1 <= i2, then there was a proper clash in [i1, i] that
              -- required a comparison with a tiebreaker key.
              PageNo !Int
i3 = PageNo -> PageNo -> PageNo
forall a. Ord a => a -> a -> a
max PageNo
i1 PageNo
i2
              -- [max i1 i2, i], this is equivalent to taking the intersection
              -- of [i1, i] and [i2, i]
              !i4 :: Int
i4 = Bound Int -> Bound Int -> Bit -> Vector Bit -> Int
bitLongestPrefixFromTo (Int -> Bound Int
forall k. k -> Bound k
BoundExclusive Int
i3) (Int -> Bound Int
forall k. k -> Bound k
BoundInclusive Int
i) (Bool -> Bit
Bit Bool
True) Vector Bit
icLargerThanPage
          in  PageNo -> PageNo -> PageSpan
multiPage (Int -> PageNo
PageNo Int
i3) (Int -> PageNo
PageNo Int
i4)
              -- [i3, i4], we consulted the LTP array to check whether the value
              -- on page i3 overflows into subsequent pages
        else
          -- [i, i], there is no clash with the previous page and so this page
          -- is also not part of a large value that spans multiple pages.
          PageNo -> PageSpan
singlePage (Int -> PageNo
PageNo Int
i)


countClashes :: IndexCompact -> Int
countClashes :: IndexCompact -> Int
countClashes = Map (Unsliced SerialisedKey) PageNo -> Int
forall k a. Map k a -> Int
Map.size (Map (Unsliced SerialisedKey) PageNo -> Int)
-> (IndexCompact -> Map (Unsliced SerialisedKey) PageNo)
-> IndexCompact
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IndexCompact -> Map (Unsliced SerialisedKey) PageNo
icTieBreaker

hasClashes :: IndexCompact -> Bool
hasClashes :: IndexCompact -> Bool
hasClashes = Bool -> Bool
not (Bool -> Bool) -> (IndexCompact -> Bool) -> IndexCompact -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map (Unsliced SerialisedKey) PageNo -> Bool
forall k a. Map k a -> Bool
Map.null (Map (Unsliced SerialisedKey) PageNo -> Bool)
-> (IndexCompact -> Map (Unsliced SerialisedKey) PageNo)
-> IndexCompact
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IndexCompact -> Map (Unsliced SerialisedKey) PageNo
icTieBreaker

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.sizeInPages').
-}
sizeInPages :: IndexCompact -> NumPages
sizeInPages :: IndexCompact -> NumPages
sizeInPages = Word -> NumPages
NumPages (Word -> NumPages)
-> (IndexCompact -> Word) -> IndexCompact -> NumPages
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Word
forall a. Enum a => Int -> a
toEnum (Int -> Word) -> (IndexCompact -> Int) -> IndexCompact -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector Word64 -> Int
forall a. Unbox a => Vector a -> Int
VU.length (Vector Word64 -> Int)
-> (IndexCompact -> Vector Word64) -> IndexCompact -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IndexCompact -> Vector Word64
icPrimary

{-------------------------------------------------------------------------------
  Non-incremental serialisation
-------------------------------------------------------------------------------}

-- | Serialises a compact index in one go.
toLBS :: NumEntries -> IndexCompact -> LBS.ByteString
toLBS :: NumEntries -> IndexCompact -> ByteString
toLBS NumEntries
numEntries IndexCompact
index =
     ByteString
headerLBS
  ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> ByteString
LBS.fromStrict (Chunk -> ByteString
Chunk.toByteString (Vector Word64 -> Chunk
word64VectorToChunk (IndexCompact -> Vector Word64
icPrimary IndexCompact
index)))
  ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> NumEntries -> IndexCompact -> ByteString
finalLBS NumEntries
numEntries IndexCompact
index

{-------------------------------------------------------------------------------
  Incremental serialisation
-------------------------------------------------------------------------------}

-- | By writing out the type–version indicator in host endianness, we also
-- indicate endianness. During deserialisation, we would discover an endianness
-- mismatch.
supportedTypeAndVersion :: Word32
supportedTypeAndVersion :: Word32
supportedTypeAndVersion = Word32
0x0001

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.headerLBS').
-}
headerLBS :: LBS.ByteString
headerLBS :: ByteString
headerLBS =
    -- create a single 4 byte chunk
    AllocationStrategy -> ByteString -> Builder -> ByteString
BB.toLazyByteStringWith (Int -> Int -> AllocationStrategy
BB.safeStrategy Int
4 Int
BB.smallChunkSize) ByteString
forall a. Monoid a => a
mempty (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$
      Word32 -> Builder
BB.word32Host Word32
supportedTypeAndVersion Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Word32 -> Builder
BB.word32Host Word32
0

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.finalLBS').
-}
finalLBS :: NumEntries -> IndexCompact -> LBS.ByteString
finalLBS :: NumEntries -> IndexCompact -> ByteString
finalLBS (NumEntries Int
numEntries) IndexCompact {Vector Word64
Vector Bit
Map (Unsliced SerialisedKey) PageNo
icPrimary :: IndexCompact -> Vector Word64
icClashes :: IndexCompact -> Vector Bit
icTieBreaker :: IndexCompact -> Map (Unsliced SerialisedKey) PageNo
icLargerThanPage :: IndexCompact -> Vector Bit
icPrimary :: Vector Word64
icClashes :: Vector Bit
icTieBreaker :: Map (Unsliced SerialisedKey) PageNo
icLargerThanPage :: Vector Bit
..} =
    -- use a builder, since it is all relatively small
    Builder -> ByteString
BB.toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$
         Vector Bit -> Builder
putBitVec Vector Bit
icClashes
      Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Vector Bit -> Builder
putBitVec Vector Bit
icLargerThanPage
      Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Map (Unsliced SerialisedKey) PageNo -> Builder
putTieBreaker Map (Unsliced SerialisedKey) PageNo
icTieBreaker
      Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Word64 -> Builder
BB.word64Host (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numPages)
      Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Word64 -> Builder
BB.word64Host (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numEntries)
  where
    numPages :: Int
numPages = Vector Word64 -> Int
forall a. Unbox a => Vector a -> Int
VU.length Vector Word64
icPrimary

-- | Constructs a chunk containing the contents of a vector of 64-bit words.
word64VectorToChunk :: VU.Vector Word64 -> Chunk
word64VectorToChunk :: Vector Word64 -> Chunk
word64VectorToChunk (VU.V_Word64 (VP.Vector Int
off Int
len ByteArray
ba)) =
    Vector Word8 -> Chunk
Chunk (Int -> Int -> ByteArray -> Vector Word8
forall a. Prim a => Int -> Int -> ByteArray -> Vector a
mkPrimVector (Int -> Int
forall a. Bits a => a -> a
mul8 Int
off) (Int -> Int
forall a. Bits a => a -> a
mul8 Int
len) ByteArray
ba)

-- | Padded to 64 bit.
--
-- Assumes that the bitvector has a byte-aligned offset.
putBitVec :: VU.Vector Bit -> BB.Builder
putBitVec :: Vector Bit -> Builder
putBitVec (BitVec Int
offsetBits Int
lenBits ByteArray
ba)
  | Int -> Int
forall a. (Bits a, Num a) => a -> a
mod8 Int
offsetBits Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0 = String -> Builder
forall a. HasCallStack => String -> a
error String
"putBitVec: not byte aligned"
  | Bool
otherwise =
       -- first only write the bytes that are fully part of the bit vec
       Int -> Int -> ByteArray -> Builder
byteArrayFromTo Int
offsetBytes Int
offsetLastByte ByteArray
ba
       -- then carefully write the last byte, might be partially uninitialised
    Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> (if Int
remainingBits Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0 then
         Word8 -> Builder
BB.word8 (ByteArray -> Int -> Word8
forall a. Prim a => ByteArray -> Int -> a
indexByteArray ByteArray
ba Int
offsetLastByte Word8 -> Word8 -> Word8
forall a. Bits a => a -> a -> a
.&. Word8
bitmaskLastByte)
       else
         Builder
forall a. Monoid a => a
mempty)
    Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int -> Builder
putPaddingTo64 Int
totalBytesWritten
  where
    offsetBytes :: Int
offsetBytes = Int -> Int
forall a. Bits a => a -> a
div8 Int
offsetBits
    offsetLastByte :: Int
offsetLastByte = Int
offsetBytes Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int -> Int
forall a. Bits a => a -> a
div8 Int
lenBits
    totalBytesWritten :: Int
totalBytesWritten = Int -> Int
forall a. (Bits a, Num a) => a -> a
ceilDiv8 Int
lenBits

    bitmaskLastByte :: Word8
bitmaskLastByte = Word8 -> Int -> Word8
forall a. Bits a => a -> Int -> a
unsafeShiftR Word8
0xFF (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
remainingBits)
    remainingBits :: Int
remainingBits = Int -> Int
forall a. (Bits a, Num a) => a -> a
mod8 Int
lenBits

-- | Padded to 64 bit.
putTieBreaker :: Map (Unsliced SerialisedKey) PageNo -> BB.Builder
putTieBreaker :: Map (Unsliced SerialisedKey) PageNo -> Builder
putTieBreaker Map (Unsliced SerialisedKey) PageNo
m =
       Word64 -> Builder
BB.word64Host (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Map (Unsliced SerialisedKey) PageNo -> Int
forall k a. Map k a -> Int
Map.size Map (Unsliced SerialisedKey) PageNo
m))
    Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ((Unsliced SerialisedKey, PageNo) -> Builder)
-> [(Unsliced SerialisedKey, PageNo)] -> Builder
forall m a. Monoid m => (a -> m) -> [a] -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Unsliced SerialisedKey, PageNo) -> Builder
putEntry (Map (Unsliced SerialisedKey) PageNo
-> [(Unsliced SerialisedKey, PageNo)]
forall k a. Map k a -> [(k, a)]
Map.assocs Map (Unsliced SerialisedKey) PageNo
m)
  where
    putEntry :: (Unsliced SerialisedKey, PageNo) -> BB.Builder
    putEntry :: (Unsliced SerialisedKey, PageNo) -> Builder
putEntry (Unsliced SerialisedKey -> SerialisedKey
fromUnslicedKey -> SerialisedKey
k, PageNo Int
pageNo) =
           Word32 -> Builder
BB.word32Host (Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
pageNo)
        Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Word32 -> Builder
BB.word32Host (Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SerialisedKey -> Int
sizeofKey SerialisedKey
k))
        Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> SerialisedKey -> Builder
serialisedKey SerialisedKey
k
        Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int -> Builder
putPaddingTo64 (SerialisedKey -> Int
sizeofKey SerialisedKey
k)

putPaddingTo64 :: Int -> BB.Builder
putPaddingTo64 :: Int -> Builder
putPaddingTo64 Int
written
  | Int -> Int
forall a. (Bits a, Num a) => a -> a
mod8 Int
written Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = Builder
forall a. Monoid a => a
mempty
  | Bool
otherwise         = (Word8 -> Builder) -> [Word8] -> Builder
forall m a. Monoid m => (a -> m) -> [a] -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap Word8 -> Builder
BB.word8 (Int -> Word8 -> [Word8]
forall a. Int -> a -> [a]
replicate (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int -> Int
forall a. (Bits a, Num a) => a -> a
mod8 Int
written) Word8
0)

{-------------------------------------------------------------------------------
  Deserialisation
-------------------------------------------------------------------------------}

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.fromSBS').
-}
fromSBS :: ShortByteString -> Either String (NumEntries, IndexCompact)
fromSBS :: ShortByteString -> Either String (NumEntries, IndexCompact)
fromSBS (SBS ByteArray#
ba') = do
    let ba :: ByteArray
ba = ByteArray# -> ByteArray
ByteArray ByteArray#
ba'
    let len8 :: Int
len8 = ByteArray -> Int
sizeofByteArray ByteArray
ba
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int -> Int
forall a. (Bits a, Num a) => a -> a
mod8 Int
len8 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$ String -> Either String ()
forall a b. a -> Either a b
Left String
"Length is not multiple of 64 bit"
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len8 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
24) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$ String -> Either String ()
forall a b. a -> Either a b
Left String
"Doesn't contain header and footer"

    -- check type and version
    let typeAndVersion :: Word32
typeAndVersion = ByteArray -> Int -> Word32
forall a. Prim a => ByteArray -> Int -> a
indexByteArray ByteArray
ba Int
0 :: Word32
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32
typeAndVersion Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
== Word32 -> Word32
byteSwap32 Word32
supportedTypeAndVersion)
         (String -> Either String ()
forall a b. a -> Either a b
Left String
"Non-matching endianness")
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32
typeAndVersion Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word32
supportedTypeAndVersion)
         (String -> Either String ()
forall a b. a -> Either a b
Left String
"Unsupported type or version")

    -- read footer
    let len64 :: Int
len64 = Int -> Int
forall a. Bits a => a -> a
div8 Int
len8
    let getPositive :: Int -> Either String Int
getPositive Int
off64 = do
          let w :: Word64
w = ByteArray -> Int -> Word64
forall a. Prim a => ByteArray -> Int -> a
indexByteArray ByteArray
ba Int
off64 :: Word64
          Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word64
w Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
forall a. Bounded a => a
maxBound :: Int)) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$
            String -> Either String ()
forall a b. a -> Either a b
Left String
"Size information is too large for Int"
          Int -> Either String Int
forall a. a -> Either String a
forall (m :: * -> *) a. Monad m => a -> m a
return (Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
w)

    Int
numPages <- Int -> Either String Int
getPositive (Int
len64 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
2)
    Int
numEntries <- Int -> Either String Int
getPositive (Int
len64 Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

    -- read vectors
    -- offsets in 64 bits
    let off1_64 :: Int
off1_64 = Int
1  -- after type–version indicator
    (!Int
off2_64, Vector Word64
icPrimary) <- String
-> ByteArray -> Int -> Int -> Either String (Int, Vector Word64)
getVec64 String
"Primary array" ByteArray
ba Int
off1_64 Int
numPages
    -- offsets in 64 bits
    let !off3 :: Int
off3 = Int
off2_64
    (!Int
off4, Vector Bit
icClashes) <- String
-> ByteArray -> Int -> Int -> Either String (Int, Vector Bit)
getBitVec String
"Clash bit vector" ByteArray
ba Int
off3 Int
numPages
    (!Int
off5, Vector Bit
icLargerThanPage) <- String
-> ByteArray -> Int -> Int -> Either String (Int, Vector Bit)
getBitVec String
"LTP bit vector" ByteArray
ba Int
off4 Int
numPages
    (!Int
off6, Map (Unsliced SerialisedKey) PageNo
icTieBreaker) <- ByteArray
-> Int -> Either String (Int, Map (Unsliced SerialisedKey) PageNo)
getTieBreaker ByteArray
ba Int
off5

    let bytesUsed :: Int
bytesUsed = Int -> Int
forall a. Bits a => a -> a
mul8 (Int
off6 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
2)
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
bytesUsed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> ByteArray -> Int
sizeofByteArray ByteArray
ba) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$
      String -> Either String ()
forall a b. a -> Either a b
Left String
"Byte array is too small for components"
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
bytesUsed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< ByteArray -> Int
sizeofByteArray ByteArray
ba) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$
      String -> Either String ()
forall a b. a -> Either a b
Left String
"Byte array is too large for components"

    (NumEntries, IndexCompact)
-> Either String (NumEntries, IndexCompact)
forall a. a -> Either String a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> NumEntries
NumEntries Int
numEntries, IndexCompact {Vector Word64
Vector Bit
Map (Unsliced SerialisedKey) PageNo
icPrimary :: Vector Word64
icClashes :: Vector Bit
icTieBreaker :: Map (Unsliced SerialisedKey) PageNo
icLargerThanPage :: Vector Bit
icPrimary :: Vector Word64
icClashes :: Vector Bit
icLargerThanPage :: Vector Bit
icTieBreaker :: Map (Unsliced SerialisedKey) PageNo
..})

type Offset32 = Int
type Offset64 = Int

getVec64 ::
     String -> ByteArray -> Offset32 -> Int
  -> Either String (Offset64, VU.Vector Word64)
getVec64 :: String
-> ByteArray -> Int -> Int -> Either String (Int, Vector Word64)
getVec64 String
name ByteArray
ba Int
off64 Int
numEntries =
    case Int -> Int -> ByteArray -> Maybe (Vector Word64)
forall a. Prim a => Int -> Int -> ByteArray -> Maybe (Vector a)
checkedPrimVec Int
off64 Int
numEntries ByteArray
ba of
      Maybe (Vector Word64)
Nothing  -> String -> Either String (Int, Vector Word64)
forall a b. a -> Either a b
Left (String
name String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" is out of bounds")
      Just Vector Word64
vec -> (Int, Vector Word64) -> Either String (Int, Vector Word64)
forall a b. b -> Either a b
Right (Int
off64 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
numEntries, Vector Word64 -> Vector Word64
VU.V_Word64 Vector Word64
vec)

getBitVec ::
     String -> ByteArray -> Offset64 -> Int
  -> Either String (Offset64, VU.Vector Bit)
getBitVec :: String
-> ByteArray -> Int -> Int -> Either String (Int, Vector Bit)
getBitVec String
name ByteArray
ba Int
off Int
numEntries =
    case Int -> Int -> ByteArray -> Maybe (Vector Bit)
checkedBitVec (Int -> Int
forall a. Bits a => a -> a
mul64 Int
off) Int
numEntries ByteArray
ba of
      Maybe (Vector Bit)
Nothing  -> String -> Either String (Int, Vector Bit)
forall a b. a -> Either a b
Left (String
name String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" is out of bounds")
      Just Vector Bit
vec -> (Int, Vector Bit) -> Either String (Int, Vector Bit)
forall a b. b -> Either a b
Right (Int
off Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int -> Int
forall a. (Bits a, Num a) => a -> a
ceilDiv64 Int
numEntries, Vector Bit
vec)

-- | Checks bounds.
--
-- Inefficient, but okay for a small number of entries.
getTieBreaker ::
     ByteArray -> Offset64
  -> Either String (Offset64, Map (Unsliced SerialisedKey) PageNo)
getTieBreaker :: ByteArray
-> Int -> Either String (Int, Map (Unsliced SerialisedKey) PageNo)
getTieBreaker ByteArray
ba = \Int
off -> do
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int -> Int
forall a. Bits a => a -> a
mul8 Int
off Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= ByteArray -> Int
sizeofByteArray ByteArray
ba) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$
      String -> Either String ()
forall a b. a -> Either a b
Left String
"Tie breaker is out of bounds"
    let size :: Int
size = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteArray -> Int -> Word64
forall a. Prim a => ByteArray -> Int -> a
indexByteArray ByteArray
ba Int
off :: Word64)
    (Int
off', [(Unsliced SerialisedKey, PageNo)]
pairs) <- Int
-> Int
-> [(Unsliced SerialisedKey, PageNo)]
-> Either String (Int, [(Unsliced SerialisedKey, PageNo)])
go Int
size (Int
off Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) []
    (Int, Map (Unsliced SerialisedKey) PageNo)
-> Either String (Int, Map (Unsliced SerialisedKey) PageNo)
forall a. a -> Either String a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
off', [(Unsliced SerialisedKey, PageNo)]
-> Map (Unsliced SerialisedKey) PageNo
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(Unsliced SerialisedKey, PageNo)]
pairs)
  where
    go :: Int -> Offset64 -> [(Unsliced SerialisedKey, PageNo)]
       -> Either String (Offset64, [(Unsliced SerialisedKey, PageNo)])
    go :: Int
-> Int
-> [(Unsliced SerialisedKey, PageNo)]
-> Either String (Int, [(Unsliced SerialisedKey, PageNo)])
go Int
0 Int
off [(Unsliced SerialisedKey, PageNo)]
pairs = (Int, [(Unsliced SerialisedKey, PageNo)])
-> Either String (Int, [(Unsliced SerialisedKey, PageNo)])
forall a. a -> Either String a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
off, [(Unsliced SerialisedKey, PageNo)]
pairs)
    go Int
n Int
off [(Unsliced SerialisedKey, PageNo)]
pairs = do
        Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int -> Int
forall a. Bits a => a -> a
mul8 Int
off Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= ByteArray -> Int
sizeofByteArray ByteArray
ba) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$
          String -> Either String ()
forall a b. a -> Either a b
Left String
"Clash map entry is out of bounds"
        let off32 :: Int
off32 = Int -> Int
forall a. Bits a => a -> a
mul2 Int
off
        let !pageNo :: Int
pageNo = Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteArray -> Int -> Word32
forall a. Prim a => ByteArray -> Int -> a
indexByteArray ByteArray
ba Int
off32 :: Word32)
        let keyLen8 :: Int
keyLen8 = Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteArray -> Int -> Word32
forall a. Prim a => ByteArray -> Int -> a
indexByteArray ByteArray
ba (Int
off32 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) :: Word32)

        (Int
off', Unsliced SerialisedKey
key) <- Int -> Int -> Either String (Int, Unsliced SerialisedKey)
getKey (Int
off Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
keyLen8
        Int
-> Int
-> [(Unsliced SerialisedKey, PageNo)]
-> Either String (Int, [(Unsliced SerialisedKey, PageNo)])
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Int
off' ((Unsliced SerialisedKey
key, Int -> PageNo
PageNo Int
pageNo) (Unsliced SerialisedKey, PageNo)
-> [(Unsliced SerialisedKey, PageNo)]
-> [(Unsliced SerialisedKey, PageNo)]
forall a. a -> [a] -> [a]
: [(Unsliced SerialisedKey, PageNo)]
pairs)

    getKey :: Offset64 -> Int -> Either String (Offset64, Unsliced SerialisedKey)
    getKey :: Int -> Int -> Either String (Int, Unsliced SerialisedKey)
getKey Int
off Int
len8 = do
        let off8 :: Int
off8 = Int -> Int
forall a. Bits a => a -> a
mul8 Int
off
        -- We avoid retaining references to the bytearray.
        -- Probably not needed, since the bytearray will stay alive as long as
        -- the compact index anyway, but we expect very few keys in the tie
        -- breaker, so it is cheap and we don't have to worry about it any more.
        !SerialisedKey
key <- case Int -> Int -> ByteArray -> Maybe (Vector Word8)
forall a. Prim a => Int -> Int -> ByteArray -> Maybe (Vector a)
checkedPrimVec Int
off8 Int
len8 ByteArray
ba of
          Maybe (Vector Word8)
Nothing  -> String -> Either String SerialisedKey
forall a b. a -> Either a b
Left (String
"Clash map key is out of bounds")
          Just Vector Word8
vec -> SerialisedKey -> Either String SerialisedKey
forall a b. b -> Either a b
Right (Vector Word8 -> SerialisedKey
SerialisedKey' Vector Word8
vec)
        (Int, Unsliced SerialisedKey)
-> Either String (Int, Unsliced SerialisedKey)
forall a. a -> Either String a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
off Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int -> Int
forall a. (Bits a, Num a) => a -> a
ceilDiv8 Int
len8, SerialisedKey -> Unsliced SerialisedKey
makeUnslicedKey SerialisedKey
key)

-- | Offset and length are in number of elements.
checkedPrimVec :: forall a.
  VP.Prim a => Int -> Int -> ByteArray -> Maybe (VP.Vector a)
checkedPrimVec :: forall a. Prim a => Int -> Int -> ByteArray -> Maybe (Vector a)
checkedPrimVec Int
off Int
len ByteArray
ba
  | Int
off Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0, a -> Int
forall a. Prim a => a -> Int
sizeOf (a
forall a. HasCallStack => a
undefined :: a) Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
off Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= ByteArray -> Int
sizeofByteArray ByteArray
ba =
      Vector a -> Maybe (Vector a)
forall a. a -> Maybe a
Just (Int -> Int -> ByteArray -> Vector a
forall a. Prim a => Int -> Int -> ByteArray -> Vector a
mkPrimVector Int
off Int
len ByteArray
ba)
  | Bool
otherwise =
      Maybe (Vector a)
forall a. Maybe a
Nothing

-- | Offset and length are in number of bits.
--
-- We can't use 'checkedPrimVec' here, since 'Bool' and 'Bit' are not 'VP.Prim'
-- (so the bit vector type doesn't use 'VP.Vector' under the hood).
checkedBitVec :: Int -> Int -> ByteArray -> Maybe (VU.Vector Bit)
checkedBitVec :: Int -> Int -> ByteArray -> Maybe (Vector Bit)
checkedBitVec Int
off Int
len ByteArray
ba
  | Int
off Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0, Int
off Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int -> Int
forall a. Bits a => a -> a
mul8 (ByteArray -> Int
sizeofByteArray ByteArray
ba) =
      Vector Bit -> Maybe (Vector Bit)
forall a. a -> Maybe a
Just (Int -> Int -> ByteArray -> Vector Bit
BitVec Int
off Int
len ByteArray
ba)
  | Bool
otherwise =
      Maybe (Vector Bit)
forall a. Maybe a
Nothing

{-------------------------------------------------------------------------------
 Vector extras
-------------------------------------------------------------------------------}

-- | Find the largest vector element that is smaller or equal to to the given
-- one, and return its vector index.
--
-- Note: this function uses 'unsafeThaw', so all considerations for using
-- 'unsafeThaw' apply to using 'unsafeSearchLE' too.
--
-- PRECONDITION: the vector is sorted in ascending order.
unsafeSearchLE ::
     (VG.Vector v e, Ord e)
  => e -> v e ->  Maybe Int -- TODO: return -1?
unsafeSearchLE :: forall (v :: * -> *) e.
(Vector v e, Ord e) =>
e -> v e -> Maybe Int
unsafeSearchLE e
e v e
vec = (forall s. ST s (Maybe Int)) -> Maybe Int
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (Maybe Int)) -> Maybe Int)
-> (forall s. ST s (Maybe Int)) -> Maybe Int
forall a b. (a -> b) -> a -> b
$ do
    -- Vector search algorithms work on mutable vectors only.
    Mutable v s e
vec' <- v e -> ST s (Mutable v (PrimState (ST s)) e)
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
v a -> m (Mutable v (PrimState m) a)
VG.unsafeThaw v e
vec
    -- @i@ is the first index where @e@ is strictly smaller than the element at
    -- @i@.
    Int
i <- (e -> Bool) -> Mutable v (PrimState (ST s)) e -> ST s Int
forall (m :: * -> *) (v :: * -> * -> *) e.
(PrimMonad m, MVector v e) =>
(e -> Bool) -> v (PrimState m) e -> m Int
VA.gallopingSearchLeftP (e -> e -> Bool
forall a. Ord a => a -> a -> Bool
> e
e) Mutable v s e
Mutable v (PrimState (ST s)) e
vec'
    -- The last (and therefore largest) element that is lesser-equal @e@ is
    -- @i-1@. However, if @i==lb@, then the interval @[lb, ub)@ doesn't contain
    -- any elements that are lesser-equal @e@.
    Maybe Int -> ST s (Maybe Int)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Int -> ST s (Maybe Int)) -> Maybe Int -> ST s (Maybe Int)
forall a b. (a -> b) -> a -> b
$ if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then Maybe Int
forall a. Maybe a
Nothing else Int -> Maybe Int
forall a. a -> Maybe a
Just (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

-- | Return the index of the last bit in the vector with the specified value, if
-- any.
--
-- TODO(optimise): optimise by implementing this function similarly to how
-- 'bitIndex' is implemented internally. Another alternative I tried is using
-- the @vector-rotvec@ package and 'V.elemIndex', but 'V.elemIndex' is up to 64x
-- slower than bitIndex.
bitIndexFromToRev :: Bound Int -> Bound Int -> Bit -> VU.Vector Bit -> Maybe Int
bitIndexFromToRev :: Bound Int -> Bound Int -> Bit -> Vector Bit -> Maybe Int
bitIndexFromToRev Bound Int
lb Bound Int
ub Bit
b Vector Bit
v = Int -> Int
reverseIx (Int -> Int) -> Maybe Int -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bit -> Vector Bit -> Maybe Int
bitIndex Bit
b (Vector Bit -> Vector Bit
forall a. Unbox a => Vector a -> Vector a
VU.reverse (Vector Bit -> Vector Bit) -> Vector Bit -> Vector Bit
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Vector Bit -> Vector Bit
forall a. Unbox a => Int -> Int -> Vector a -> Vector a
VU.slice Int
lb' (Int
ub' Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
lb' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Vector Bit
v)
  where
    reverseIx :: Int -> Int
reverseIx Int
x = Int
ub' Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
x
    lb' :: Int
lb' = Bound Int -> Int
vectorLowerBound Bound Int
lb
    ub' :: Int
ub' = Vector Bit -> Bound Int -> Int
forall (v :: * -> *) a. Vector v a => v a -> Bound Int -> Int
vectorUpperBound Vector Bit
v Bound Int
ub

-- | Like 'bitIndex', but only searches the vector within the give lower and
-- upper bound. Returns the index into the original vector, not the slice.
bitIndexFromTo :: Bound Int -> Bound Int -> Bit -> VU.Vector Bit -> Maybe Int
bitIndexFromTo :: Bound Int -> Bound Int -> Bit -> Vector Bit -> Maybe Int
bitIndexFromTo Bound Int
lb Bound Int
ub Bit
b Vector Bit
v = Int -> Int
shiftIx (Int -> Int) -> Maybe Int -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bit -> Vector Bit -> Maybe Int
bitIndex Bit
b (Int -> Int -> Vector Bit -> Vector Bit
forall a. Unbox a => Int -> Int -> Vector a -> Vector a
VU.slice Int
lb' (Int
ub' Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
lb' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Vector Bit
v)
  where
    shiftIx :: Int -> Int
shiftIx = (Int
lb'+)
    lb' :: Int
lb' = Bound Int -> Int
vectorLowerBound Bound Int
lb
    ub' :: Int
ub' = Vector Bit -> Bound Int -> Int
forall (v :: * -> *) a. Vector v a => v a -> Bound Int -> Int
vectorUpperBound Vector Bit
v Bound Int
ub

-- | Find the longest prefix of the vector that consists of only bits matching
-- the given value. The return value is the index of the last bit in the prefix.
bitLongestPrefixFromTo :: Bound Int -> Bound Int -> Bit -> Vector Bit -> Int
bitLongestPrefixFromTo :: Bound Int -> Bound Int -> Bit -> Vector Bit -> Int
bitLongestPrefixFromTo Bound Int
lb Bound Int
ub Bit
b Vector Bit
v = Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
ub' (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1) (Maybe Int -> Int) -> Maybe Int -> Int
forall a b. (a -> b) -> a -> b
$ Bound Int -> Bound Int -> Bit -> Vector Bit -> Maybe Int
bitIndexFromTo Bound Int
lb Bound Int
ub (Bit -> Bit
toggle Bit
b) Vector Bit
v
  where
    toggle :: Bit -> Bit
toggle (Bit Bool
x) = Bool -> Bit
Bit (Bool -> Bool
not Bool
x)
    ub' :: Int
ub' = Vector Bit -> Bound Int -> Int
forall (v :: * -> *) a. Vector v a => v a -> Bound Int -> Int
vectorUpperBound Vector Bit
v Bound Int
ub

vectorLowerBound :: Bound Int -> Int
vectorLowerBound :: Bound Int -> Int
vectorLowerBound = \case
    Bound Int
NoBound          -> Int
0
    BoundExclusive Int
i -> Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
    BoundInclusive Int
i -> Int
i

vectorUpperBound :: VG.Vector v a => v a -> Bound Int -> Int
vectorUpperBound :: forall (v :: * -> *) a. Vector v a => v a -> Bound Int -> Int
vectorUpperBound v a
v = \case
    Bound Int
NoBound          -> v a -> Int
forall (v :: * -> *) a. Vector v a => v a -> Int
VG.length v a
v Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
    BoundExclusive Int
i -> Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
    BoundInclusive Int
i -> Int
i