{-# LANGUAGE CPP #-}
{-# OPTIONS_HADDOCK not-home #-}

-- |
-- Incremental construction of a compact index yields chunks of the primary array
-- that can be serialised incrementally.
--
-- Incremental construction is an 'ST' computation that can be started using
-- 'new', returning an 'IndexCompactAcc' structure that accumulates internal
-- state. 'append'ing new pages to the 'IndexCompactAcc' /might/ yield 'Chunk's.
-- Incremental construction can be finalised with 'unsafeEnd', which yields both
-- a 'Chunk' (possibly) and the `IndexCompact'.
--
module Database.LSMTree.Internal.Index.CompactAcc (
    -- * Construction
    IndexCompactAcc (..)
  , new
  , newWithDefaults
  , appendSingle
  , appendMulti
  , unsafeEnd
    -- * Internal: exported for testing and benchmarking
  , SMaybe (..)
  , unsafeWriteRange
  , vectorLowerBound
  , mvectorUpperBound
  ) where

#ifdef NO_IGNORE_ASSERTS
import           Control.Exception (assert)
#endif

import           Control.Monad (when)
import           Control.Monad.ST.Strict
import           Data.Bit hiding (flipBit)
import           Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NE
import           Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import           Data.Primitive.ByteArray (newPinnedByteArray, setByteArray)
import           Data.STRef.Strict
import qualified Data.Vector.Generic.Mutable as VGM
import qualified Data.Vector.Primitive.Mutable as VPM
import qualified Data.Vector.Unboxed as VU
import qualified Data.Vector.Unboxed.Mutable as VUM
import           Data.Word
import           Database.LSMTree.Internal.BitMath
import           Database.LSMTree.Internal.Chunk (Chunk)
import           Database.LSMTree.Internal.Index.Compact
import           Database.LSMTree.Internal.Map.Range (Bound (..))
import           Database.LSMTree.Internal.Page
import           Database.LSMTree.Internal.Serialise
import           Database.LSMTree.Internal.Unsliced

{-------------------------------------------------------------------------------
  Construction
-------------------------------------------------------------------------------}

-- | A mutable version of 'IndexCompact'. See [incremental
-- construction](#incremental).
data IndexCompactAcc s = IndexCompactAcc {
    -- * Core index structure
    -- | Accumulates pinned chunks of 'ciPrimary'.
    forall s.
IndexCompactAcc s -> STRef s (NonEmpty (MVector s Word64))
icaPrimary           :: !(STRef s (NonEmpty (VU.MVector s Word64)))
    -- | Accumulates chunks of 'ciClashes'.
  , forall s. IndexCompactAcc s -> STRef s (NonEmpty (MVector s Bit))
icaClashes           :: !(STRef s (NonEmpty (VU.MVector s Bit)))
    -- | Accumulates the 'ciTieBreaker'.
  , forall s.
IndexCompactAcc s -> STRef s (Map (Unsliced SerialisedKey) PageNo)
icaTieBreaker        :: !(STRef s (Map (Unsliced SerialisedKey) PageNo))
    -- | Accumulates chunks of 'ciLargerThanPage'.
  , forall s. IndexCompactAcc s -> STRef s (NonEmpty (MVector s Bit))
icaLargerThanPage    :: !(STRef s (NonEmpty (VU.MVector s Bit)))

    -- * Aux information required for incremental construction
    -- | Maximum size of a chunk
  , forall s. IndexCompactAcc s -> Int
icaMaxChunkSize      :: !Int
    -- | The number of the current disk page we are constructing the index for.
  , forall s. IndexCompactAcc s -> STRef s Int
icaCurrentPageNumber :: !(STRef s Int)
    -- | The primary bits of the page-maximum key that we saw last.
    --
    -- This should be 'SNothing' if we haven't seen any keys/pages yet.
  , forall s. IndexCompactAcc s -> STRef s (SMaybe Word64)
icaLastMaxPrimbits   :: !(STRef s (SMaybe Word64))
    -- | The ful minimum key of the page that we saw last.
    --
    -- This should be 'SNothing' if we haven't seen any keys/pages yet.
  , forall s. IndexCompactAcc s -> STRef s (SMaybe SerialisedKey)
icaLastMinKey        :: !(STRef s (SMaybe SerialisedKey))
  }

-- | @'new' maxcsize@ creates a new mutable index with a maximum chunk size of
-- @maxcsize@.
--
-- PRECONDITION: maxcsize > 0
--
-- Note: after initialisation, @maxcsize@ can no longer be changed.
new ::Int -> ST s (IndexCompactAcc s)
new :: forall s. Int -> ST s (IndexCompactAcc s)
new Int
maxcsize = STRef s (NonEmpty (MVector s Word64))
-> STRef s (NonEmpty (MVector s Bit))
-> STRef s (Map (Unsliced SerialisedKey) PageNo)
-> STRef s (NonEmpty (MVector s Bit))
-> Int
-> STRef s Int
-> STRef s (SMaybe Word64)
-> STRef s (SMaybe SerialisedKey)
-> IndexCompactAcc s
forall s.
STRef s (NonEmpty (MVector s Word64))
-> STRef s (NonEmpty (MVector s Bit))
-> STRef s (Map (Unsliced SerialisedKey) PageNo)
-> STRef s (NonEmpty (MVector s Bit))
-> Int
-> STRef s Int
-> STRef s (SMaybe Word64)
-> STRef s (SMaybe SerialisedKey)
-> IndexCompactAcc s
IndexCompactAcc
    -- Core index structure
    (STRef s (NonEmpty (MVector s Word64))
 -> STRef s (NonEmpty (MVector s Bit))
 -> STRef s (Map (Unsliced SerialisedKey) PageNo)
 -> STRef s (NonEmpty (MVector s Bit))
 -> Int
 -> STRef s Int
 -> STRef s (SMaybe Word64)
 -> STRef s (SMaybe SerialisedKey)
 -> IndexCompactAcc s)
-> ST s (STRef s (NonEmpty (MVector s Word64)))
-> ST
     s
     (STRef s (NonEmpty (MVector s Bit))
      -> STRef s (Map (Unsliced SerialisedKey) PageNo)
      -> STRef s (NonEmpty (MVector s Bit))
      -> Int
      -> STRef s Int
      -> STRef s (SMaybe Word64)
      -> STRef s (SMaybe SerialisedKey)
      -> IndexCompactAcc s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (NonEmpty (MVector s Word64)
-> ST s (STRef s (NonEmpty (MVector s Word64)))
forall a s. a -> ST s (STRef s a)
newSTRef (NonEmpty (MVector s Word64)
 -> ST s (STRef s (NonEmpty (MVector s Word64))))
-> (MVector s Word64 -> NonEmpty (MVector s Word64))
-> MVector s Word64
-> ST s (STRef s (NonEmpty (MVector s Word64)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVector s Word64 -> NonEmpty (MVector s Word64)
forall a. a -> NonEmpty a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVector s Word64 -> ST s (STRef s (NonEmpty (MVector s Word64))))
-> ST s (MVector s Word64)
-> ST s (STRef s (NonEmpty (MVector s Word64)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> ST s (MVector s Word64)
forall s. Int -> ST s (MVector s Word64)
newPinnedMVec64 Int
maxcsize)
    ST
  s
  (STRef s (NonEmpty (MVector s Bit))
   -> STRef s (Map (Unsliced SerialisedKey) PageNo)
   -> STRef s (NonEmpty (MVector s Bit))
   -> Int
   -> STRef s Int
   -> STRef s (SMaybe Word64)
   -> STRef s (SMaybe SerialisedKey)
   -> IndexCompactAcc s)
-> ST s (STRef s (NonEmpty (MVector s Bit)))
-> ST
     s
     (STRef s (Map (Unsliced SerialisedKey) PageNo)
      -> STRef s (NonEmpty (MVector s Bit))
      -> Int
      -> STRef s Int
      -> STRef s (SMaybe Word64)
      -> STRef s (SMaybe SerialisedKey)
      -> IndexCompactAcc s)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (NonEmpty (MVector s Bit)
-> ST s (STRef s (NonEmpty (MVector s Bit)))
forall a s. a -> ST s (STRef s a)
newSTRef (NonEmpty (MVector s Bit)
 -> ST s (STRef s (NonEmpty (MVector s Bit))))
-> (MVector s Bit -> NonEmpty (MVector s Bit))
-> MVector s Bit
-> ST s (STRef s (NonEmpty (MVector s Bit)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVector s Bit -> NonEmpty (MVector s Bit)
forall a. a -> NonEmpty a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVector s Bit -> ST s (STRef s (NonEmpty (MVector s Bit))))
-> ST s (MVector s Bit)
-> ST s (STRef s (NonEmpty (MVector s Bit)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> ST s (MVector (PrimState (ST s)) Bit)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.new Int
maxcsize)
    ST
  s
  (STRef s (Map (Unsliced SerialisedKey) PageNo)
   -> STRef s (NonEmpty (MVector s Bit))
   -> Int
   -> STRef s Int
   -> STRef s (SMaybe Word64)
   -> STRef s (SMaybe SerialisedKey)
   -> IndexCompactAcc s)
-> ST s (STRef s (Map (Unsliced SerialisedKey) PageNo))
-> ST
     s
     (STRef s (NonEmpty (MVector s Bit))
      -> Int
      -> STRef s Int
      -> STRef s (SMaybe Word64)
      -> STRef s (SMaybe SerialisedKey)
      -> IndexCompactAcc s)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map (Unsliced SerialisedKey) PageNo
-> ST s (STRef s (Map (Unsliced SerialisedKey) PageNo))
forall a s. a -> ST s (STRef s a)
newSTRef Map (Unsliced SerialisedKey) PageNo
forall k a. Map k a
Map.empty
    ST
  s
  (STRef s (NonEmpty (MVector s Bit))
   -> Int
   -> STRef s Int
   -> STRef s (SMaybe Word64)
   -> STRef s (SMaybe SerialisedKey)
   -> IndexCompactAcc s)
-> ST s (STRef s (NonEmpty (MVector s Bit)))
-> ST
     s
     (Int
      -> STRef s Int
      -> STRef s (SMaybe Word64)
      -> STRef s (SMaybe SerialisedKey)
      -> IndexCompactAcc s)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (NonEmpty (MVector s Bit)
-> ST s (STRef s (NonEmpty (MVector s Bit)))
forall a s. a -> ST s (STRef s a)
newSTRef (NonEmpty (MVector s Bit)
 -> ST s (STRef s (NonEmpty (MVector s Bit))))
-> (MVector s Bit -> NonEmpty (MVector s Bit))
-> MVector s Bit
-> ST s (STRef s (NonEmpty (MVector s Bit)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVector s Bit -> NonEmpty (MVector s Bit)
forall a. a -> NonEmpty a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVector s Bit -> ST s (STRef s (NonEmpty (MVector s Bit))))
-> ST s (MVector s Bit)
-> ST s (STRef s (NonEmpty (MVector s Bit)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> ST s (MVector (PrimState (ST s)) Bit)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.new Int
maxcsize)
    -- Aux information required for incremental construction
    ST
  s
  (Int
   -> STRef s Int
   -> STRef s (SMaybe Word64)
   -> STRef s (SMaybe SerialisedKey)
   -> IndexCompactAcc s)
-> ST s Int
-> ST
     s
     (STRef s Int
      -> STRef s (SMaybe Word64)
      -> STRef s (SMaybe SerialisedKey)
      -> IndexCompactAcc s)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> ST s Int
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
maxcsize
    ST
  s
  (STRef s Int
   -> STRef s (SMaybe Word64)
   -> STRef s (SMaybe SerialisedKey)
   -> IndexCompactAcc s)
-> ST s (STRef s Int)
-> ST
     s
     (STRef s (SMaybe Word64)
      -> STRef s (SMaybe SerialisedKey) -> IndexCompactAcc s)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> ST s (STRef s Int)
forall a s. a -> ST s (STRef s a)
newSTRef Int
0
    ST
  s
  (STRef s (SMaybe Word64)
   -> STRef s (SMaybe SerialisedKey) -> IndexCompactAcc s)
-> ST s (STRef s (SMaybe Word64))
-> ST s (STRef s (SMaybe SerialisedKey) -> IndexCompactAcc s)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> SMaybe Word64 -> ST s (STRef s (SMaybe Word64))
forall a s. a -> ST s (STRef s a)
newSTRef SMaybe Word64
forall a. SMaybe a
SNothing
    ST s (STRef s (SMaybe SerialisedKey) -> IndexCompactAcc s)
-> ST s (STRef s (SMaybe SerialisedKey))
-> ST s (IndexCompactAcc s)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> SMaybe SerialisedKey -> ST s (STRef s (SMaybe SerialisedKey))
forall a s. a -> ST s (STRef s a)
newSTRef SMaybe SerialisedKey
forall a. SMaybe a
SNothing

-- | We explictly pin the byte arrays, since that allows for more efficient
-- serialisation, as the definition of 'isByteArrayPinned' changed in GHC 9.6,
-- see <https://gitlab.haskell.org/ghc/ghc/-/issues/22255>.
--
-- TODO: remove this workaround once a solution exists, e.g. a new primop that
-- allows checking for implicit pinning.
newPinnedMVec64 :: Int -> ST s (VUM.MVector s Word64)
newPinnedMVec64 :: forall s. Int -> ST s (MVector s Word64)
newPinnedMVec64 Int
lenWords = do
    MutableByteArray s
mba <- Int -> ST s (MutableByteArray (PrimState (ST s)))
forall (m :: * -> *).
PrimMonad m =>
Int -> m (MutableByteArray (PrimState m))
newPinnedByteArray (Int -> Int
forall a. Bits a => a -> a
mul8 Int
lenWords)
    MutableByteArray (PrimState (ST s))
-> Int -> Int -> Word64 -> ST s ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> Int -> a -> m ()
setByteArray MutableByteArray s
MutableByteArray (PrimState (ST s))
mba Int
0 Int
lenWords (Word64
0 :: Word64)
    MVector s Word64 -> ST s (MVector s Word64)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (MVector s Word64 -> MVector s Word64
forall s. MVector s Word64 -> MVector s Word64
VUM.MV_Word64 (Int -> Int -> MutableByteArray s -> MVector s Word64
forall s a. Int -> Int -> MutableByteArray s -> MVector s a
VPM.MVector Int
0 Int
lenWords MutableByteArray s
mba))

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.newWithDefaults').
-}
newWithDefaults :: ST s (IndexCompactAcc s)
newWithDefaults :: forall s. ST s (IndexCompactAcc s)
newWithDefaults = Int -> ST s (IndexCompactAcc s)
forall s. Int -> ST s (IndexCompactAcc s)
new Int
1024

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.appendSingle').
-}
appendSingle :: forall s. (SerialisedKey, SerialisedKey) -> IndexCompactAcc s -> ST s (Maybe Chunk)
appendSingle :: forall s.
(SerialisedKey, SerialisedKey)
-> IndexCompactAcc s -> ST s (Maybe Chunk)
appendSingle (SerialisedKey
minKey, SerialisedKey
maxKey) ica :: IndexCompactAcc s
ica@IndexCompactAcc{Int
STRef s Int
STRef s (NonEmpty (MVector s Word64))
STRef s (NonEmpty (MVector s Bit))
STRef s (Map (Unsliced SerialisedKey) PageNo)
STRef s (SMaybe Word64)
STRef s (SMaybe SerialisedKey)
icaPrimary :: forall s.
IndexCompactAcc s -> STRef s (NonEmpty (MVector s Word64))
icaClashes :: forall s. IndexCompactAcc s -> STRef s (NonEmpty (MVector s Bit))
icaTieBreaker :: forall s.
IndexCompactAcc s -> STRef s (Map (Unsliced SerialisedKey) PageNo)
icaLargerThanPage :: forall s. IndexCompactAcc s -> STRef s (NonEmpty (MVector s Bit))
icaMaxChunkSize :: forall s. IndexCompactAcc s -> Int
icaCurrentPageNumber :: forall s. IndexCompactAcc s -> STRef s Int
icaLastMaxPrimbits :: forall s. IndexCompactAcc s -> STRef s (SMaybe Word64)
icaLastMinKey :: forall s. IndexCompactAcc s -> STRef s (SMaybe SerialisedKey)
icaPrimary :: STRef s (NonEmpty (MVector s Word64))
icaClashes :: STRef s (NonEmpty (MVector s Bit))
icaTieBreaker :: STRef s (Map (Unsliced SerialisedKey) PageNo)
icaLargerThanPage :: STRef s (NonEmpty (MVector s Bit))
icaMaxChunkSize :: Int
icaCurrentPageNumber :: STRef s Int
icaLastMaxPrimbits :: STRef s (SMaybe Word64)
icaLastMinKey :: STRef s (SMaybe SerialisedKey)
..} = do
#ifdef NO_IGNORE_ASSERTS
    lastMinKey <- readSTRef icaLastMinKey
    assert (minKey <= maxKey && smaybe True (<= minKey) lastMinKey) $ pure ()  -- sorted
#endif
    Int
pageNo <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
icaCurrentPageNumber
    let ix :: Int
ix = Int
pageNo Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
icaMaxChunkSize
    Int -> Int -> ST s ()
goAppend Int
pageNo Int
ix
    STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
icaCurrentPageNumber (Int -> ST s ()) -> Int -> ST s ()
forall a b. (a -> b) -> a -> b
$! Int
pageNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
    IndexCompactAcc s -> ST s (Maybe Chunk)
forall s. IndexCompactAcc s -> ST s (Maybe Chunk)
yield IndexCompactAcc s
ica
  where
    minPrimbits, maxPrimbits :: Word64
    minPrimbits :: Word64
minPrimbits = SerialisedKey -> Word64
keyTopBits64 SerialisedKey
minKey
    maxPrimbits :: Word64
maxPrimbits = SerialisedKey -> Word64
keyTopBits64 SerialisedKey
maxKey

    -- | Meat of the function
    goAppend ::
         Int -- ^ Current /global/ page number
      -> Int -- ^ Current /local/ page number (inside the current chunk)
      -> ST s ()
    goAppend :: Int -> Int -> ST s ()
goAppend Int
pageNo Int
ix = do
        ST s ()
writePrimary
        ST s ()
writeClashesAndLTP
      where
        -- | Set value in primary vector
        writePrimary :: ST s ()
        writePrimary :: ST s ()
writePrimary =
            STRef s (NonEmpty (MVector s Word64))
-> ST s (NonEmpty (MVector s Word64))
forall s a. STRef s a -> ST s a
readSTRef STRef s (NonEmpty (MVector s Word64))
icaPrimary ST s (NonEmpty (MVector s Word64))
-> (NonEmpty (MVector s Word64) -> ST s ()) -> ST s ()
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \NonEmpty (MVector s Word64)
cs -> MVector (PrimState (ST s)) Word64 -> Int -> Word64 -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.write (NonEmpty (MVector s Word64) -> MVector s Word64
forall a. NonEmpty a -> a
NE.head NonEmpty (MVector s Word64)
cs) Int
ix Word64
minPrimbits

        -- | Set value in clash vector, tie-breaker map and larger-than-page
        -- vector
        writeClashesAndLTP :: ST s ()
        writeClashesAndLTP :: ST s ()
writeClashesAndLTP = do
            SMaybe Word64
lastMaxPrimbits <- STRef s (SMaybe Word64) -> ST s (SMaybe Word64)
forall s a. STRef s a -> ST s a
readSTRef STRef s (SMaybe Word64)
icaLastMaxPrimbits
            let clash :: Bool
clash = SMaybe Word64
lastMaxPrimbits SMaybe Word64 -> SMaybe Word64 -> Bool
forall a. Eq a => a -> a -> Bool
== Word64 -> SMaybe Word64
forall a. a -> SMaybe a
SJust Word64
minPrimbits
            STRef s (SMaybe Word64) -> SMaybe Word64 -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (SMaybe Word64)
icaLastMaxPrimbits (SMaybe Word64 -> ST s ()) -> SMaybe Word64 -> ST s ()
forall a b. (a -> b) -> a -> b
$! Word64 -> SMaybe Word64
forall a. a -> SMaybe a
SJust Word64
maxPrimbits

            SMaybe SerialisedKey
lastMinKey <- STRef s (SMaybe SerialisedKey) -> ST s (SMaybe SerialisedKey)
forall s a. STRef s a -> ST s a
readSTRef STRef s (SMaybe SerialisedKey)
icaLastMinKey
            let ltp :: Bool
ltp = SerialisedKey -> SMaybe SerialisedKey
forall a. a -> SMaybe a
SJust SerialisedKey
minKey SMaybe SerialisedKey -> SMaybe SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
== SMaybe SerialisedKey
lastMinKey
            STRef s (SMaybe SerialisedKey) -> SMaybe SerialisedKey -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s (SMaybe SerialisedKey)
icaLastMinKey (SMaybe SerialisedKey -> ST s ())
-> SMaybe SerialisedKey -> ST s ()
forall a b. (a -> b) -> a -> b
$! SerialisedKey -> SMaybe SerialisedKey
forall a. a -> SMaybe a
SJust SerialisedKey
minKey

            STRef s (NonEmpty (MVector s Bit))
-> ST s (NonEmpty (MVector s Bit))
forall s a. STRef s a -> ST s a
readSTRef STRef s (NonEmpty (MVector s Bit))
icaClashes ST s (NonEmpty (MVector s Bit))
-> (NonEmpty (MVector s Bit) -> ST s ()) -> ST s ()
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \NonEmpty (MVector s Bit)
cs -> MVector (PrimState (ST s)) Bit -> Int -> Bit -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.write (NonEmpty (MVector s Bit) -> MVector s Bit
forall a. NonEmpty a -> a
NE.head NonEmpty (MVector s Bit)
cs) Int
ix (Bool -> Bit
Bit Bool
clash)
            STRef s (NonEmpty (MVector s Bit))
-> ST s (NonEmpty (MVector s Bit))
forall s a. STRef s a -> ST s a
readSTRef STRef s (NonEmpty (MVector s Bit))
icaLargerThanPage ST s (NonEmpty (MVector s Bit))
-> (NonEmpty (MVector s Bit) -> ST s ()) -> ST s ()
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \NonEmpty (MVector s Bit)
cs -> MVector (PrimState (ST s)) Bit -> Int -> Bit -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> Int -> a -> m ()
VUM.write (NonEmpty (MVector s Bit) -> MVector s Bit
forall a. NonEmpty a -> a
NE.head NonEmpty (MVector s Bit)
cs) Int
ix (Bool -> Bit
Bit Bool
ltp)
            Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
clash Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
ltp) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$
              STRef s (Map (Unsliced SerialisedKey) PageNo)
-> (Map (Unsliced SerialisedKey) PageNo
    -> Map (Unsliced SerialisedKey) PageNo)
-> ST s ()
forall s a. STRef s a -> (a -> a) -> ST s ()
modifySTRef' STRef s (Map (Unsliced SerialisedKey) PageNo)
icaTieBreaker (Unsliced SerialisedKey
-> PageNo
-> Map (Unsliced SerialisedKey) PageNo
-> Map (Unsliced SerialisedKey) PageNo
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (SerialisedKey -> Unsliced SerialisedKey
makeUnslicedKey SerialisedKey
minKey) (Int -> PageNo
PageNo Int
pageNo))

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.appendMulti').
-}
appendMulti :: forall s. (SerialisedKey, Word32) -> IndexCompactAcc s -> ST s [Chunk]
appendMulti :: forall s.
(SerialisedKey, Word32) -> IndexCompactAcc s -> ST s [Chunk]
appendMulti (SerialisedKey
k, Word32
n0) ica :: IndexCompactAcc s
ica@IndexCompactAcc{Int
STRef s Int
STRef s (NonEmpty (MVector s Word64))
STRef s (NonEmpty (MVector s Bit))
STRef s (Map (Unsliced SerialisedKey) PageNo)
STRef s (SMaybe Word64)
STRef s (SMaybe SerialisedKey)
icaPrimary :: forall s.
IndexCompactAcc s -> STRef s (NonEmpty (MVector s Word64))
icaClashes :: forall s. IndexCompactAcc s -> STRef s (NonEmpty (MVector s Bit))
icaTieBreaker :: forall s.
IndexCompactAcc s -> STRef s (Map (Unsliced SerialisedKey) PageNo)
icaLargerThanPage :: forall s. IndexCompactAcc s -> STRef s (NonEmpty (MVector s Bit))
icaMaxChunkSize :: forall s. IndexCompactAcc s -> Int
icaCurrentPageNumber :: forall s. IndexCompactAcc s -> STRef s Int
icaLastMaxPrimbits :: forall s. IndexCompactAcc s -> STRef s (SMaybe Word64)
icaLastMinKey :: forall s. IndexCompactAcc s -> STRef s (SMaybe SerialisedKey)
icaPrimary :: STRef s (NonEmpty (MVector s Word64))
icaClashes :: STRef s (NonEmpty (MVector s Bit))
icaTieBreaker :: STRef s (Map (Unsliced SerialisedKey) PageNo)
icaLargerThanPage :: STRef s (NonEmpty (MVector s Bit))
icaMaxChunkSize :: Int
icaCurrentPageNumber :: STRef s Int
icaLastMaxPrimbits :: STRef s (SMaybe Word64)
icaLastMinKey :: STRef s (SMaybe SerialisedKey)
..} =
    ([Chunk] -> [Chunk])
-> (Chunk -> [Chunk] -> [Chunk])
-> Maybe Chunk
-> [Chunk]
-> [Chunk]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [Chunk] -> [Chunk]
forall a. a -> a
id (:) (Maybe Chunk -> [Chunk] -> [Chunk])
-> ST s (Maybe Chunk) -> ST s ([Chunk] -> [Chunk])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (SerialisedKey, SerialisedKey)
-> IndexCompactAcc s -> ST s (Maybe Chunk)
forall s.
(SerialisedKey, SerialisedKey)
-> IndexCompactAcc s -> ST s (Maybe Chunk)
appendSingle (SerialisedKey
k, SerialisedKey
k) IndexCompactAcc s
ica ST s ([Chunk] -> [Chunk]) -> ST s [Chunk] -> ST s [Chunk]
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> ST s [Chunk]
overflows (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
n0)
  where
    minPrimbits :: Word64
    minPrimbits :: Word64
minPrimbits = SerialisedKey -> Word64
keyTopBits64 SerialisedKey
k

    -- | Fill primary, clash and LTP vectors for a larger-than-page value. Yields
    -- chunks if necessary
    overflows :: Int -> ST s [Chunk]
    overflows :: Int -> ST s [Chunk]
overflows Int
n
      | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = [Chunk] -> ST s [Chunk]
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
      | Bool
otherwise = do
          Int
pageNo <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
icaCurrentPageNumber
          let ix :: Int
ix = Int
pageNo Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
icaMaxChunkSize -- will be 0 in recursive calls
              remInChunk :: Int
remInChunk = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
n (Int
icaMaxChunkSize Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ix)
          STRef s (NonEmpty (MVector s Word64))
-> ST s (NonEmpty (MVector s Word64))
forall s a. STRef s a -> ST s a
readSTRef STRef s (NonEmpty (MVector s Word64))
icaPrimary ST s (NonEmpty (MVector s Word64))
-> (NonEmpty (MVector s Word64) -> ST s ()) -> ST s ()
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \NonEmpty (MVector s Word64)
cs ->
            MVector s Word64 -> Bound Int -> Bound Int -> Word64 -> ST s ()
forall a s.
Unbox a =>
MVector s a -> Bound Int -> Bound Int -> a -> ST s ()
unsafeWriteRange (NonEmpty (MVector s Word64) -> MVector s Word64
forall a. NonEmpty a -> a
NE.head NonEmpty (MVector s Word64)
cs) (Int -> Bound Int
forall k. k -> Bound k
BoundInclusive Int
ix) (Int -> Bound Int
forall k. k -> Bound k
BoundExclusive (Int -> Bound Int) -> Int -> Bound Int
forall a b. (a -> b) -> a -> b
$ Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
remInChunk) Word64
minPrimbits
          STRef s (NonEmpty (MVector s Bit))
-> ST s (NonEmpty (MVector s Bit))
forall s a. STRef s a -> ST s a
readSTRef STRef s (NonEmpty (MVector s Bit))
icaClashes ST s (NonEmpty (MVector s Bit))
-> (NonEmpty (MVector s Bit) -> ST s ()) -> ST s ()
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \NonEmpty (MVector s Bit)
cs ->
            MVector s Bit -> Bound Int -> Bound Int -> Bit -> ST s ()
forall a s.
Unbox a =>
MVector s a -> Bound Int -> Bound Int -> a -> ST s ()
unsafeWriteRange (NonEmpty (MVector s Bit) -> MVector s Bit
forall a. NonEmpty a -> a
NE.head NonEmpty (MVector s Bit)
cs) (Int -> Bound Int
forall k. k -> Bound k
BoundInclusive Int
ix) (Int -> Bound Int
forall k. k -> Bound k
BoundExclusive (Int -> Bound Int) -> Int -> Bound Int
forall a b. (a -> b) -> a -> b
$ Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
remInChunk) (Bool -> Bit
Bit Bool
True)
          STRef s (NonEmpty (MVector s Bit))
-> ST s (NonEmpty (MVector s Bit))
forall s a. STRef s a -> ST s a
readSTRef STRef s (NonEmpty (MVector s Bit))
icaLargerThanPage ST s (NonEmpty (MVector s Bit))
-> (NonEmpty (MVector s Bit) -> ST s ()) -> ST s ()
forall a b. ST s a -> (a -> ST s b) -> ST s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \NonEmpty (MVector s Bit)
cs ->
            MVector s Bit -> Bound Int -> Bound Int -> Bit -> ST s ()
forall a s.
Unbox a =>
MVector s a -> Bound Int -> Bound Int -> a -> ST s ()
unsafeWriteRange (NonEmpty (MVector s Bit) -> MVector s Bit
forall a. NonEmpty a -> a
NE.head NonEmpty (MVector s Bit)
cs) (Int -> Bound Int
forall k. k -> Bound k
BoundInclusive Int
ix) (Int -> Bound Int
forall k. k -> Bound k
BoundExclusive (Int -> Bound Int) -> Int -> Bound Int
forall a b. (a -> b) -> a -> b
$ Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
remInChunk) (Bool -> Bit
Bit Bool
True)
          STRef s Int -> Int -> ST s ()
forall s a. STRef s a -> a -> ST s ()
writeSTRef STRef s Int
icaCurrentPageNumber (Int -> ST s ()) -> Int -> ST s ()
forall a b. (a -> b) -> a -> b
$! Int
pageNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
remInChunk
          Maybe Chunk
res <- IndexCompactAcc s -> ST s (Maybe Chunk)
forall s. IndexCompactAcc s -> ST s (Maybe Chunk)
yield IndexCompactAcc s
ica
          ([Chunk] -> [Chunk])
-> (Chunk -> [Chunk] -> [Chunk])
-> Maybe Chunk
-> [Chunk]
-> [Chunk]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [Chunk] -> [Chunk]
forall a. a -> a
id (:) Maybe Chunk
res ([Chunk] -> [Chunk]) -> ST s [Chunk] -> ST s [Chunk]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> ST s [Chunk]
overflows (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
remInChunk)

-- | Yield a chunk and start a new one if the current chunk is already full.
--
-- TODO(optimisation): yield will eagerly allocate new mutable vectors, but
-- maybe that should be done lazily.
--
-- INVARIANTS: see [construction invariants](#construction-invariants).
yield :: IndexCompactAcc s -> ST s (Maybe Chunk)
yield :: forall s. IndexCompactAcc s -> ST s (Maybe Chunk)
yield IndexCompactAcc{Int
STRef s Int
STRef s (NonEmpty (MVector s Word64))
STRef s (NonEmpty (MVector s Bit))
STRef s (Map (Unsliced SerialisedKey) PageNo)
STRef s (SMaybe Word64)
STRef s (SMaybe SerialisedKey)
icaPrimary :: forall s.
IndexCompactAcc s -> STRef s (NonEmpty (MVector s Word64))
icaClashes :: forall s. IndexCompactAcc s -> STRef s (NonEmpty (MVector s Bit))
icaTieBreaker :: forall s.
IndexCompactAcc s -> STRef s (Map (Unsliced SerialisedKey) PageNo)
icaLargerThanPage :: forall s. IndexCompactAcc s -> STRef s (NonEmpty (MVector s Bit))
icaMaxChunkSize :: forall s. IndexCompactAcc s -> Int
icaCurrentPageNumber :: forall s. IndexCompactAcc s -> STRef s Int
icaLastMaxPrimbits :: forall s. IndexCompactAcc s -> STRef s (SMaybe Word64)
icaLastMinKey :: forall s. IndexCompactAcc s -> STRef s (SMaybe SerialisedKey)
icaPrimary :: STRef s (NonEmpty (MVector s Word64))
icaClashes :: STRef s (NonEmpty (MVector s Bit))
icaTieBreaker :: STRef s (Map (Unsliced SerialisedKey) PageNo)
icaLargerThanPage :: STRef s (NonEmpty (MVector s Bit))
icaMaxChunkSize :: Int
icaCurrentPageNumber :: STRef s Int
icaLastMaxPrimbits :: STRef s (SMaybe Word64)
icaLastMinKey :: STRef s (SMaybe SerialisedKey)
..} = do
    Int
pageNo <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
icaCurrentPageNumber
    if Int
pageNo Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
icaMaxChunkSize Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then do -- The current chunk is full
      Vector Word64
primaryChunk <- MVector s Word64 -> ST s (Vector Word64)
MVector (PrimState (ST s)) Word64 -> ST s (Vector Word64)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze (MVector s Word64 -> ST s (Vector Word64))
-> (NonEmpty (MVector s Word64) -> MVector s Word64)
-> NonEmpty (MVector s Word64)
-> ST s (Vector Word64)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NonEmpty (MVector s Word64) -> MVector s Word64
forall a. NonEmpty a -> a
NE.head (NonEmpty (MVector s Word64) -> ST s (Vector Word64))
-> ST s (NonEmpty (MVector s Word64)) -> ST s (Vector Word64)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STRef s (NonEmpty (MVector s Word64))
-> ST s (NonEmpty (MVector s Word64))
forall s a. STRef s a -> ST s a
readSTRef STRef s (NonEmpty (MVector s Word64))
icaPrimary
      STRef s (NonEmpty (MVector s Word64))
-> (NonEmpty (MVector s Word64) -> NonEmpty (MVector s Word64))
-> ST s ()
forall s a. STRef s a -> (a -> a) -> ST s ()
modifySTRef' STRef s (NonEmpty (MVector s Word64))
icaPrimary ((NonEmpty (MVector s Word64) -> NonEmpty (MVector s Word64))
 -> ST s ())
-> (MVector s Word64
    -> NonEmpty (MVector s Word64) -> NonEmpty (MVector s Word64))
-> MVector s Word64
-> ST s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVector s Word64
-> NonEmpty (MVector s Word64) -> NonEmpty (MVector s Word64)
forall a. a -> NonEmpty a -> NonEmpty a
NE.cons (MVector s Word64 -> ST s ()) -> ST s (MVector s Word64) -> ST s ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> ST s (MVector s Word64)
forall s. Int -> ST s (MVector s Word64)
newPinnedMVec64 Int
icaMaxChunkSize
      STRef s (NonEmpty (MVector s Bit))
-> (NonEmpty (MVector s Bit) -> NonEmpty (MVector s Bit))
-> ST s ()
forall s a. STRef s a -> (a -> a) -> ST s ()
modifySTRef' STRef s (NonEmpty (MVector s Bit))
icaClashes ((NonEmpty (MVector s Bit) -> NonEmpty (MVector s Bit)) -> ST s ())
-> (MVector s Bit
    -> NonEmpty (MVector s Bit) -> NonEmpty (MVector s Bit))
-> MVector s Bit
-> ST s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVector s Bit
-> NonEmpty (MVector s Bit) -> NonEmpty (MVector s Bit)
forall a. a -> NonEmpty a -> NonEmpty a
NE.cons (MVector s Bit -> ST s ()) -> ST s (MVector s Bit) -> ST s ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> ST s (MVector (PrimState (ST s)) Bit)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.new Int
icaMaxChunkSize
      STRef s (NonEmpty (MVector s Bit))
-> (NonEmpty (MVector s Bit) -> NonEmpty (MVector s Bit))
-> ST s ()
forall s a. STRef s a -> (a -> a) -> ST s ()
modifySTRef' STRef s (NonEmpty (MVector s Bit))
icaLargerThanPage ((NonEmpty (MVector s Bit) -> NonEmpty (MVector s Bit)) -> ST s ())
-> (MVector s Bit
    -> NonEmpty (MVector s Bit) -> NonEmpty (MVector s Bit))
-> MVector s Bit
-> ST s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVector s Bit
-> NonEmpty (MVector s Bit) -> NonEmpty (MVector s Bit)
forall a. a -> NonEmpty a -> NonEmpty a
NE.cons (MVector s Bit -> ST s ()) -> ST s (MVector s Bit) -> ST s ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> ST s (MVector (PrimState (ST s)) Bit)
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
Int -> m (MVector (PrimState m) a)
VUM.new Int
icaMaxChunkSize
      Maybe Chunk -> ST s (Maybe Chunk)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Chunk -> ST s (Maybe Chunk))
-> Maybe Chunk -> ST s (Maybe Chunk)
forall a b. (a -> b) -> a -> b
$ Chunk -> Maybe Chunk
forall a. a -> Maybe a
Just (Vector Word64 -> Chunk
word64VectorToChunk Vector Word64
primaryChunk)
    else -- the current chunk is not yet full
      Maybe Chunk -> ST s (Maybe Chunk)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Chunk
forall a. Maybe a
Nothing

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.unsafeEnd').
-}
unsafeEnd :: IndexCompactAcc s -> ST s (Maybe Chunk, IndexCompact)
unsafeEnd :: forall s. IndexCompactAcc s -> ST s (Maybe Chunk, IndexCompact)
unsafeEnd IndexCompactAcc{Int
STRef s Int
STRef s (NonEmpty (MVector s Word64))
STRef s (NonEmpty (MVector s Bit))
STRef s (Map (Unsliced SerialisedKey) PageNo)
STRef s (SMaybe Word64)
STRef s (SMaybe SerialisedKey)
icaPrimary :: forall s.
IndexCompactAcc s -> STRef s (NonEmpty (MVector s Word64))
icaClashes :: forall s. IndexCompactAcc s -> STRef s (NonEmpty (MVector s Bit))
icaTieBreaker :: forall s.
IndexCompactAcc s -> STRef s (Map (Unsliced SerialisedKey) PageNo)
icaLargerThanPage :: forall s. IndexCompactAcc s -> STRef s (NonEmpty (MVector s Bit))
icaMaxChunkSize :: forall s. IndexCompactAcc s -> Int
icaCurrentPageNumber :: forall s. IndexCompactAcc s -> STRef s Int
icaLastMaxPrimbits :: forall s. IndexCompactAcc s -> STRef s (SMaybe Word64)
icaLastMinKey :: forall s. IndexCompactAcc s -> STRef s (SMaybe SerialisedKey)
icaPrimary :: STRef s (NonEmpty (MVector s Word64))
icaClashes :: STRef s (NonEmpty (MVector s Bit))
icaTieBreaker :: STRef s (Map (Unsliced SerialisedKey) PageNo)
icaLargerThanPage :: STRef s (NonEmpty (MVector s Bit))
icaMaxChunkSize :: Int
icaCurrentPageNumber :: STRef s Int
icaLastMaxPrimbits :: STRef s (SMaybe Word64)
icaLastMinKey :: STRef s (SMaybe SerialisedKey)
..} = do
    Int
pageNo <- STRef s Int -> ST s Int
forall s a. STRef s a -> ST s a
readSTRef STRef s Int
icaCurrentPageNumber
    let ix :: Int
ix = Int
pageNo Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
icaMaxChunkSize

    [Vector Word64]
chunksPrimary <-
      (MVector s Word64 -> ST s (Vector Word64))
-> [MVector s Word64] -> ST s [Vector Word64]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse MVector s Word64 -> ST s (Vector Word64)
MVector (PrimState (ST s)) Word64 -> ST s (Vector Word64)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze ([MVector s Word64] -> ST s [Vector Word64])
-> (NonEmpty (MVector s Word64) -> [MVector s Word64])
-> NonEmpty (MVector s Word64)
-> ST s [Vector Word64]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> NonEmpty (MVector s Word64) -> [MVector s Word64]
forall {a} {s}.
Unbox a =>
Int -> NonEmpty (MVector s a) -> [MVector s a]
sliceCurrent Int
ix (NonEmpty (MVector s Word64) -> ST s [Vector Word64])
-> ST s (NonEmpty (MVector s Word64)) -> ST s [Vector Word64]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STRef s (NonEmpty (MVector s Word64))
-> ST s (NonEmpty (MVector s Word64))
forall s a. STRef s a -> ST s a
readSTRef STRef s (NonEmpty (MVector s Word64))
icaPrimary
    [Vector Bit]
chunksClashes <-
      (MVector s Bit -> ST s (Vector Bit))
-> [MVector s Bit] -> ST s [Vector Bit]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse MVector s Bit -> ST s (Vector Bit)
MVector (PrimState (ST s)) Bit -> ST s (Vector Bit)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze ([MVector s Bit] -> ST s [Vector Bit])
-> (NonEmpty (MVector s Bit) -> [MVector s Bit])
-> NonEmpty (MVector s Bit)
-> ST s [Vector Bit]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> NonEmpty (MVector s Bit) -> [MVector s Bit]
forall {a} {s}.
Unbox a =>
Int -> NonEmpty (MVector s a) -> [MVector s a]
sliceCurrent Int
ix (NonEmpty (MVector s Bit) -> ST s [Vector Bit])
-> ST s (NonEmpty (MVector s Bit)) -> ST s [Vector Bit]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STRef s (NonEmpty (MVector s Bit))
-> ST s (NonEmpty (MVector s Bit))
forall s a. STRef s a -> ST s a
readSTRef STRef s (NonEmpty (MVector s Bit))
icaClashes
    [Vector Bit]
chunksLargerThanPage <-
      (MVector s Bit -> ST s (Vector Bit))
-> [MVector s Bit] -> ST s [Vector Bit]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse MVector s Bit -> ST s (Vector Bit)
MVector (PrimState (ST s)) Bit -> ST s (Vector Bit)
forall a (m :: * -> *).
(Unbox a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
VU.unsafeFreeze ([MVector s Bit] -> ST s [Vector Bit])
-> (NonEmpty (MVector s Bit) -> [MVector s Bit])
-> NonEmpty (MVector s Bit)
-> ST s [Vector Bit]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> NonEmpty (MVector s Bit) -> [MVector s Bit]
forall {a} {s}.
Unbox a =>
Int -> NonEmpty (MVector s a) -> [MVector s a]
sliceCurrent Int
ix (NonEmpty (MVector s Bit) -> ST s [Vector Bit])
-> ST s (NonEmpty (MVector s Bit)) -> ST s [Vector Bit]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STRef s (NonEmpty (MVector s Bit))
-> ST s (NonEmpty (MVector s Bit))
forall s a. STRef s a -> ST s a
readSTRef STRef s (NonEmpty (MVector s Bit))
icaLargerThanPage

    -- Only slice out a chunk if there are entries in the chunk
    let mchunk :: Maybe Chunk
mchunk = if Int
ix Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
          then Maybe Chunk
forall a. Maybe a
Nothing
          else Chunk -> Maybe Chunk
forall a. a -> Maybe a
Just (Vector Word64 -> Chunk
word64VectorToChunk ([Vector Word64] -> Vector Word64
forall a. HasCallStack => [a] -> a
head [Vector Word64]
chunksPrimary))

    let icPrimary :: Vector Word64
icPrimary = [Vector Word64] -> Vector Word64
forall a. Unbox a => [Vector a] -> Vector a
VU.concat ([Vector Word64] -> Vector Word64)
-> ([Vector Word64] -> [Vector Word64])
-> [Vector Word64]
-> Vector Word64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Vector Word64] -> [Vector Word64]
forall a. [a] -> [a]
reverse ([Vector Word64] -> Vector Word64)
-> [Vector Word64] -> Vector Word64
forall a b. (a -> b) -> a -> b
$ [Vector Word64]
chunksPrimary
    let icClashes :: Vector Bit
icClashes = [Vector Bit] -> Vector Bit
forall a. Unbox a => [Vector a] -> Vector a
VU.concat ([Vector Bit] -> Vector Bit)
-> ([Vector Bit] -> [Vector Bit]) -> [Vector Bit] -> Vector Bit
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Vector Bit] -> [Vector Bit]
forall a. [a] -> [a]
reverse ([Vector Bit] -> Vector Bit) -> [Vector Bit] -> Vector Bit
forall a b. (a -> b) -> a -> b
$ [Vector Bit]
chunksClashes
    let icLargerThanPage :: Vector Bit
icLargerThanPage = [Vector Bit] -> Vector Bit
forall a. Unbox a => [Vector a] -> Vector a
VU.concat ([Vector Bit] -> Vector Bit)
-> ([Vector Bit] -> [Vector Bit]) -> [Vector Bit] -> Vector Bit
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Vector Bit] -> [Vector Bit]
forall a. [a] -> [a]
reverse ([Vector Bit] -> Vector Bit) -> [Vector Bit] -> Vector Bit
forall a b. (a -> b) -> a -> b
$ [Vector Bit]
chunksLargerThanPage
    Map (Unsliced SerialisedKey) PageNo
icTieBreaker <- STRef s (Map (Unsliced SerialisedKey) PageNo)
-> ST s (Map (Unsliced SerialisedKey) PageNo)
forall s a. STRef s a -> ST s a
readSTRef STRef s (Map (Unsliced SerialisedKey) PageNo)
icaTieBreaker

    (Maybe Chunk, IndexCompact) -> ST s (Maybe Chunk, IndexCompact)
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Chunk
mchunk, IndexCompact {Vector Word64
Vector Bit
Map (Unsliced SerialisedKey) PageNo
icPrimary :: Vector Word64
icClashes :: Vector Bit
icLargerThanPage :: Vector Bit
icTieBreaker :: Map (Unsliced SerialisedKey) PageNo
icPrimary :: Vector Word64
icClashes :: Vector Bit
icTieBreaker :: Map (Unsliced SerialisedKey) PageNo
icLargerThanPage :: Vector Bit
..})
  where
    -- The current (most recent) chunk of the bitvectors is only partially
    -- constructed, so we need to only use the part that is already filled.
    sliceCurrent :: Int -> NonEmpty (MVector s a) -> [MVector s a]
sliceCurrent Int
ix (MVector s a
c NE.:| [MVector s a]
cs)
      | Int
ix Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = [MVector s a]
cs  -- current chunk is completely empty, just ignore it
      | Bool
otherwise = Int -> Int -> MVector s a -> MVector s a
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.slice Int
0 Int
ix MVector s a
c MVector s a -> [MVector s a] -> [MVector s a]
forall a. a -> [a] -> [a]
: [MVector s a]
cs

{-------------------------------------------------------------------------------
  Strict 'Maybe'
-------------------------------------------------------------------------------}

data SMaybe a = SNothing | SJust !a
  deriving stock (SMaybe a -> SMaybe a -> Bool
(SMaybe a -> SMaybe a -> Bool)
-> (SMaybe a -> SMaybe a -> Bool) -> Eq (SMaybe a)
forall a. Eq a => SMaybe a -> SMaybe a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a. Eq a => SMaybe a -> SMaybe a -> Bool
== :: SMaybe a -> SMaybe a -> Bool
$c/= :: forall a. Eq a => SMaybe a -> SMaybe a -> Bool
/= :: SMaybe a -> SMaybe a -> Bool
Eq, Int -> SMaybe a -> ShowS
[SMaybe a] -> ShowS
SMaybe a -> String
(Int -> SMaybe a -> ShowS)
-> (SMaybe a -> String) -> ([SMaybe a] -> ShowS) -> Show (SMaybe a)
forall a. Show a => Int -> SMaybe a -> ShowS
forall a. Show a => [SMaybe a] -> ShowS
forall a. Show a => SMaybe a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> SMaybe a -> ShowS
showsPrec :: Int -> SMaybe a -> ShowS
$cshow :: forall a. Show a => SMaybe a -> String
show :: SMaybe a -> String
$cshowList :: forall a. Show a => [SMaybe a] -> ShowS
showList :: [SMaybe a] -> ShowS
Show)

#ifdef NO_IGNORE_ASSERTS
smaybe :: b -> (a -> b) -> SMaybe a -> b
smaybe snothing sjust = \case
    SNothing -> snothing
    SJust x  -> sjust x
#endif

{-------------------------------------------------------------------------------
 Vector extras
-------------------------------------------------------------------------------}

unsafeWriteRange :: VU.Unbox a => VU.MVector s a -> Bound Int -> Bound Int -> a -> ST s ()
unsafeWriteRange :: forall a s.
Unbox a =>
MVector s a -> Bound Int -> Bound Int -> a -> ST s ()
unsafeWriteRange !MVector s a
v !Bound Int
lb !Bound Int
ub !a
x = MVector (PrimState (ST s)) a -> a -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Unbox a) =>
MVector (PrimState m) a -> a -> m ()
VUM.set (Int -> Int -> MVector s a -> MVector s a
forall a s. Unbox a => Int -> Int -> MVector s a -> MVector s a
VUM.unsafeSlice Int
lb' Int
len MVector s a
v) a
x
  where
    !lb' :: Int
lb' = Bound Int -> Int
vectorLowerBound Bound Int
lb
    !ub' :: Int
ub' = MVector s a -> Bound Int -> Int
forall (v :: * -> * -> *) a s.
MVector v a =>
v s a -> Bound Int -> Int
mvectorUpperBound MVector s a
v Bound Int
ub
    !len :: Int
len = Int
ub' Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
lb' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1

-- | Map a 'Bound' to the equivalent inclusive lower bound.
vectorLowerBound :: Bound Int -> Int
vectorLowerBound :: Bound Int -> Int
vectorLowerBound = \case
    Bound Int
NoBound          -> Int
0
    BoundExclusive Int
i -> Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
    BoundInclusive Int
i -> Int
i

-- | Map a 'Bound' to the equivalent inclusive upper bound.
mvectorUpperBound :: VGM.MVector v a => v s a -> Bound Int -> Int
mvectorUpperBound :: forall (v :: * -> * -> *) a s.
MVector v a =>
v s a -> Bound Int -> Int
mvectorUpperBound v s a
v = \case
    Bound Int
NoBound          -> v s a -> Int
forall (v :: * -> * -> *) a s. MVector v a => v s a -> Int
VGM.length v s a
v Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
    BoundExclusive Int
i -> Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
    BoundInclusive Int
i -> Int
i