{-# OPTIONS_HADDOCK not-home #-}
{- HLINT ignore "Avoid restricted alias" -}

{-|
    A general-purpose fence pointer index.

    Keys used with an ordinary index must be smaller than 64 KiB.
-}
module Database.LSMTree.Internal.Index.Ordinary
(
    IndexOrdinary (IndexOrdinary),
    toUnslicedLastKeys,
    search,
    sizeInPages,
    headerLBS,
    finalLBS,
    fromSBS
)
where

import           Prelude hiding (drop, last, length)

import           Control.DeepSeq (NFData (rnf))
import           Control.Exception (assert)
import           Control.Monad (when)
import           Data.ByteString.Builder (toLazyByteString)
import           Data.ByteString.Builder.Extra (word32Host, word64Host)
import           Data.ByteString.Lazy (LazyByteString)
import           Data.ByteString.Short (ShortByteString (SBS))
import qualified Data.ByteString.Short as ShortByteString (length)
import           Data.Primitive.ByteArray (ByteArray (ByteArray),
                     indexByteArray)
import           Data.Vector (Vector, drop, findIndex, findIndexR, fromList,
                     last, length, (!))
import qualified Data.Vector.Primitive as Primitive (Vector (Vector), drop,
                     force, length, null, splitAt, take)
import           Data.Word (Word16, Word32, Word64, Word8, byteSwap32)
import           Database.LSMTree.Internal.Entry (NumEntries (NumEntries),
                     unNumEntries)
import           Database.LSMTree.Internal.Page (NumPages (NumPages),
                     PageNo (PageNo), PageSpan (PageSpan))
import           Database.LSMTree.Internal.Serialise
                     (SerialisedKey (SerialisedKey'))
import           Database.LSMTree.Internal.Unsliced (Unsliced, makeUnslicedKey)
import           Database.LSMTree.Internal.Vector (binarySearchL, mkPrimVector)

{-|
    The type–version indicator for the ordinary index and its serialisation
    format as supported by this module.
-}
supportedTypeAndVersion :: Word32
supportedTypeAndVersion :: Word32
supportedTypeAndVersion = Word32
0x0101

{-|
    A general-purpose fence pointer index.

    An index is represented by a vector that maps the number of each page to the
    key stored last in this page or, if the page is an overflow page, to the key
    of the corresponding key–value pair. The vector must have the following
    properties:

      * It is non-empty.

      * Its elements are non-decreasing.

    This restriction follows from the fact that a run must contain keys in
    ascending order and must comprise at least one page for 'search' to be able
    to return a valid page span.
-}
newtype IndexOrdinary = IndexOrdinary (Vector (Unsliced SerialisedKey))
    deriving stock (IndexOrdinary -> IndexOrdinary -> Bool
(IndexOrdinary -> IndexOrdinary -> Bool)
-> (IndexOrdinary -> IndexOrdinary -> Bool) -> Eq IndexOrdinary
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: IndexOrdinary -> IndexOrdinary -> Bool
== :: IndexOrdinary -> IndexOrdinary -> Bool
$c/= :: IndexOrdinary -> IndexOrdinary -> Bool
/= :: IndexOrdinary -> IndexOrdinary -> Bool
Eq, Int -> IndexOrdinary -> ShowS
[IndexOrdinary] -> ShowS
IndexOrdinary -> String
(Int -> IndexOrdinary -> ShowS)
-> (IndexOrdinary -> String)
-> ([IndexOrdinary] -> ShowS)
-> Show IndexOrdinary
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> IndexOrdinary -> ShowS
showsPrec :: Int -> IndexOrdinary -> ShowS
$cshow :: IndexOrdinary -> String
show :: IndexOrdinary -> String
$cshowList :: [IndexOrdinary] -> ShowS
showList :: [IndexOrdinary] -> ShowS
Show)

instance NFData IndexOrdinary where

    rnf :: IndexOrdinary -> ()
rnf (IndexOrdinary Vector (Unsliced SerialisedKey)
unslicedLastKeys) = Vector (Unsliced SerialisedKey) -> ()
forall a. NFData a => a -> ()
rnf Vector (Unsliced SerialisedKey)
unslicedLastKeys

toUnslicedLastKeys :: IndexOrdinary -> Vector (Unsliced SerialisedKey)
toUnslicedLastKeys :: IndexOrdinary -> Vector (Unsliced SerialisedKey)
toUnslicedLastKeys (IndexOrdinary Vector (Unsliced SerialisedKey)
unslicedLastKeys) = Vector (Unsliced SerialisedKey)
unslicedLastKeys

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.search').
-}
search :: SerialisedKey -> IndexOrdinary -> PageSpan
search :: SerialisedKey -> IndexOrdinary -> PageSpan
search SerialisedKey
key (IndexOrdinary Vector (Unsliced SerialisedKey)
unslicedLastKeys)
  -- TODO: ideally, we could assert that an index is never empty, but
  -- unfortunately we can not currently do this. Runs (and thefeore indexes)
  -- /can/ be empty if they were created by a last-level merge where all input
  -- entries were deletes. Other parts of the @lsm-tree@ code won't fail as long
  -- as we return @PageSpan 0 0@ when we search an empty ordinary index. The
  -- ideal fix would be to remove empty runs from the levels entirely, but this
  -- requires more involved changes to the merge schedule and until then we'll
  -- just hack the @pageCount <= 0@ case in.
  | Int
pageCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = PageNo -> PageNo -> PageSpan
PageSpan (Int -> PageNo
PageNo Int
0) (Int -> PageNo
PageNo Int
0)
  | Bool
otherwise = Bool -> PageSpan -> PageSpan
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
pageCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) PageSpan
result where

    protoStart :: Int
    !protoStart :: Int
protoStart = Vector (Unsliced SerialisedKey) -> Unsliced SerialisedKey -> Int
forall a. Ord a => Vector a -> a -> Int
binarySearchL Vector (Unsliced SerialisedKey)
unslicedLastKeys (SerialisedKey -> Unsliced SerialisedKey
makeUnslicedKey SerialisedKey
key)

    pageCount :: Int
    !pageCount :: Int
pageCount = Vector (Unsliced SerialisedKey) -> Int
forall a. Vector a -> Int
length Vector (Unsliced SerialisedKey)
unslicedLastKeys

    result :: PageSpan
    result :: PageSpan
result | Int
protoStart Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
pageCount
               = let

                     unslicedResultKey :: Unsliced SerialisedKey
                     !unslicedResultKey :: Unsliced SerialisedKey
unslicedResultKey = Vector (Unsliced SerialisedKey)
unslicedLastKeys Vector (Unsliced SerialisedKey) -> Int -> Unsliced SerialisedKey
forall a. Vector a -> Int -> a
! Int
protoStart

                     end :: Int
                     !end :: Int
end = Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int -> Int
forall a. Enum a => a -> a
pred Int
pageCount) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
protoStart) (Maybe Int -> Int) -> Maybe Int -> Int
forall a b. (a -> b) -> a -> b
$
                            (Unsliced SerialisedKey -> Bool)
-> Vector (Unsliced SerialisedKey) -> Maybe Int
forall a. (a -> Bool) -> Vector a -> Maybe Int
findIndex (Unsliced SerialisedKey -> Unsliced SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
/= Unsliced SerialisedKey
unslicedResultKey) (Vector (Unsliced SerialisedKey) -> Maybe Int)
-> Vector (Unsliced SerialisedKey) -> Maybe Int
forall a b. (a -> b) -> a -> b
$
                            Int
-> Vector (Unsliced SerialisedKey)
-> Vector (Unsliced SerialisedKey)
forall a. Int -> Vector a -> Vector a
drop (Int -> Int
forall a. Enum a => a -> a
succ Int
protoStart) Vector (Unsliced SerialisedKey)
unslicedLastKeys

                 in PageNo -> PageNo -> PageSpan
PageSpan (Int -> PageNo
PageNo (Int -> PageNo) -> Int -> PageNo
forall a b. (a -> b) -> a -> b
$ Int
protoStart)
                             (Int -> PageNo
PageNo (Int -> PageNo) -> Int -> PageNo
forall a b. (a -> b) -> a -> b
$ Int
end)
           | Bool
otherwise
               = let

                     unslicedResultKey :: Unsliced SerialisedKey
                     !unslicedResultKey :: Unsliced SerialisedKey
unslicedResultKey = Vector (Unsliced SerialisedKey) -> Unsliced SerialisedKey
forall a. Vector a -> a
last Vector (Unsliced SerialisedKey)
unslicedLastKeys

                     start :: Int
                     !start :: Int
start = Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
0 Int -> Int
forall a. Enum a => a -> a
succ (Maybe Int -> Int) -> Maybe Int -> Int
forall a b. (a -> b) -> a -> b
$
                              (Unsliced SerialisedKey -> Bool)
-> Vector (Unsliced SerialisedKey) -> Maybe Int
forall a. (a -> Bool) -> Vector a -> Maybe Int
findIndexR (Unsliced SerialisedKey -> Unsliced SerialisedKey -> Bool
forall a. Eq a => a -> a -> Bool
/= Unsliced SerialisedKey
unslicedResultKey) (Vector (Unsliced SerialisedKey) -> Maybe Int)
-> Vector (Unsliced SerialisedKey) -> Maybe Int
forall a b. (a -> b) -> a -> b
$
                              Vector (Unsliced SerialisedKey)
unslicedLastKeys

                 in PageNo -> PageNo -> PageSpan
PageSpan (Int -> PageNo
PageNo (Int -> PageNo) -> Int -> PageNo
forall a b. (a -> b) -> a -> b
$ Int
start)
                             (Int -> PageNo
PageNo (Int -> PageNo) -> Int -> PageNo
forall a b. (a -> b) -> a -> b
$ Int -> Int
forall a. Enum a => a -> a
pred Int
pageCount)

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.sizeInPages').
-}
sizeInPages :: IndexOrdinary -> NumPages
sizeInPages :: IndexOrdinary -> NumPages
sizeInPages (IndexOrdinary Vector (Unsliced SerialisedKey)
unslicedLastKeys)
    = Word -> NumPages
NumPages (Word -> NumPages) -> Word -> NumPages
forall a b. (a -> b) -> a -> b
$ Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Vector (Unsliced SerialisedKey) -> Int
forall a. Vector a -> Int
length Vector (Unsliced SerialisedKey)
unslicedLastKeys)

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.headerLBS').
-}
headerLBS :: LazyByteString
headerLBS :: LazyByteString
headerLBS = Builder -> LazyByteString
toLazyByteString        (Builder -> LazyByteString) -> Builder -> LazyByteString
forall a b. (a -> b) -> a -> b
$
            Word32 -> Builder
word32Host              (Word32 -> Builder) -> Word32 -> Builder
forall a b. (a -> b) -> a -> b
$
            Word32
supportedTypeAndVersion

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.finalLBS').
-}
finalLBS :: NumEntries -> IndexOrdinary -> LazyByteString
finalLBS :: NumEntries -> IndexOrdinary -> LazyByteString
finalLBS NumEntries
entryCount IndexOrdinary
_ = Builder -> LazyByteString
toLazyByteString (Builder -> LazyByteString) -> Builder -> LazyByteString
forall a b. (a -> b) -> a -> b
$
                        Word64 -> Builder
word64Host       (Word64 -> Builder) -> Word64 -> Builder
forall a b. (a -> b) -> a -> b
$
                        Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral     (Int -> Word64) -> Int -> Word64
forall a b. (a -> b) -> a -> b
$
                        NumEntries -> Int
unNumEntries     (NumEntries -> Int) -> NumEntries -> Int
forall a b. (a -> b) -> a -> b
$
                        NumEntries
entryCount

{-|
    For a specification of this operation, see the documentation of [its
    type-agnostic version]('Database.LSMTree.Internal.Index.fromSBS').
-}
fromSBS :: ShortByteString -> Either String (NumEntries, IndexOrdinary)
fromSBS :: ShortByteString -> Either String (NumEntries, IndexOrdinary)
fromSBS shortByteString :: ShortByteString
shortByteString@(SBS ByteArray#
unliftedByteArray)
    | Int
fullSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
12
        = String -> Either String (NumEntries, IndexOrdinary)
forall a b. a -> Either a b
Left String
"Doesn't contain header and footer"
    | Word32
typeAndVersion Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
== Word32 -> Word32
byteSwap32 Word32
supportedTypeAndVersion
        = String -> Either String (NumEntries, IndexOrdinary)
forall a b. a -> Either a b
Left String
"Non-matching endianness"
    | Word32
typeAndVersion Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word32
supportedTypeAndVersion
        = String -> Either String (NumEntries, IndexOrdinary)
forall a b. a -> Either a b
Left String
"Unsupported type or version"
    | Bool
otherwise
        = (,) (NumEntries -> IndexOrdinary -> (NumEntries, IndexOrdinary))
-> Either String NumEntries
-> Either String (IndexOrdinary -> (NumEntries, IndexOrdinary))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either String NumEntries
entryCount Either String (IndexOrdinary -> (NumEntries, IndexOrdinary))
-> Either String IndexOrdinary
-> Either String (NumEntries, IndexOrdinary)
forall a b.
Either String (a -> b) -> Either String a -> Either String b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Either String IndexOrdinary
index
    where

    fullSize :: Int
    fullSize :: Int
fullSize = ShortByteString -> Int
ShortByteString.length ShortByteString
shortByteString

    byteArray :: ByteArray
    byteArray :: ByteArray
byteArray = ByteArray# -> ByteArray
ByteArray ByteArray#
unliftedByteArray

    fullBytes :: Primitive.Vector Word8
    fullBytes :: Vector Word8
fullBytes = Int -> Int -> ByteArray -> Vector Word8
forall a. Prim a => Int -> Int -> ByteArray -> Vector a
mkPrimVector Int
0 Int
fullSize ByteArray
byteArray

    typeAndVersion :: Word32
    typeAndVersion :: Word32
typeAndVersion = ByteArray -> Int -> Word32
forall a. Prim a => ByteArray -> Int -> a
indexByteArray ByteArray
byteArray Int
0

    postTypeAndVersionBytes :: Primitive.Vector Word8
    postTypeAndVersionBytes :: Vector Word8
postTypeAndVersionBytes = Int -> Vector Word8 -> Vector Word8
forall a. Prim a => Int -> Vector a -> Vector a
Primitive.drop Int
4 Vector Word8
fullBytes

    lastKeysBytes, entryCountBytes :: Primitive.Vector Word8
    (Vector Word8
lastKeysBytes, Vector Word8
entryCountBytes)
        = Int -> Vector Word8 -> (Vector Word8, Vector Word8)
forall a. Prim a => Int -> Vector a -> (Vector a, Vector a)
Primitive.splitAt (Int
fullSize Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
12) Vector Word8
postTypeAndVersionBytes

    entryCount :: Either String NumEntries
    entryCount :: Either String NumEntries
entryCount | Word64 -> Integer
forall a. Integral a => a -> Integer
toInteger Word64
asWord64 Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Integer
forall a. Integral a => a -> Integer
toInteger (Int
forall a. Bounded a => a
maxBound :: Int)
                   = String -> Either String NumEntries
forall a b. a -> Either a b
Left String
"Number of entries not representable as Int"
               | Bool
otherwise
                   = NumEntries -> Either String NumEntries
forall a b. b -> Either a b
Right (Int -> NumEntries
NumEntries (Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
asWord64))
        where

        asWord64 :: Word64
        asWord64 :: Word64
asWord64 = ByteArray -> Int -> Word64
forall a. Prim a => ByteArray -> Int -> a
indexByteArray ByteArray
entryCountRep Int
0

        entryCountRep :: ByteArray
        Primitive.Vector Int
_ Int
_ ByteArray
entryCountRep = Vector Word8 -> Vector Word8
forall a. Prim a => Vector a -> Vector a
Primitive.force Vector Word8
entryCountBytes

    index :: Either String IndexOrdinary
    index :: Either String IndexOrdinary
index = Vector (Unsliced SerialisedKey) -> IndexOrdinary
IndexOrdinary (Vector (Unsliced SerialisedKey) -> IndexOrdinary)
-> ([Unsliced SerialisedKey] -> Vector (Unsliced SerialisedKey))
-> [Unsliced SerialisedKey]
-> IndexOrdinary
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Unsliced SerialisedKey] -> Vector (Unsliced SerialisedKey)
forall a. [a] -> Vector a
fromList ([Unsliced SerialisedKey] -> IndexOrdinary)
-> Either String [Unsliced SerialisedKey]
-> Either String IndexOrdinary
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector Word8 -> Either String [Unsliced SerialisedKey]
unslicedLastKeys Vector Word8
lastKeysBytes

    unslicedLastKeys :: Primitive.Vector Word8
                     -> Either String [Unsliced SerialisedKey]
    unslicedLastKeys :: Vector Word8 -> Either String [Unsliced SerialisedKey]
unslicedLastKeys Vector Word8
bytes
        | Vector Word8 -> Bool
forall a. Prim a => Vector a -> Bool
Primitive.null Vector Word8
bytes
            = [Unsliced SerialisedKey] -> Either String [Unsliced SerialisedKey]
forall a b. b -> Either a b
Right []
        | Bool
otherwise
            = do
                  Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Vector Word8 -> Int
forall a. Prim a => Vector a -> Int
Primitive.length Vector Word8
bytes Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
2)
                       (String -> Either String ()
forall a b. a -> Either a b
Left String
"Too few bytes for key size")
                  let

                      firstSizeRep :: ByteArray
                      Primitive.Vector Int
_ Int
_ ByteArray
firstSizeRep
                          = Vector Word8 -> Vector Word8
forall a. Prim a => Vector a -> Vector a
Primitive.force (Int -> Vector Word8 -> Vector Word8
forall a. Prim a => Int -> Vector a -> Vector a
Primitive.take Int
2 Vector Word8
bytes)

                      firstSize :: Int
                      firstSize :: Int
firstSize = Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word16 -> Int) -> Word16 -> Int
forall a b. (a -> b) -> a -> b
$
                                  (ByteArray -> Int -> Word16
forall a. Prim a => ByteArray -> Int -> a
indexByteArray ByteArray
firstSizeRep Int
0 :: Word16)

                      postFirstSizeBytes :: Primitive.Vector Word8
                      postFirstSizeBytes :: Vector Word8
postFirstSizeBytes = Int -> Vector Word8 -> Vector Word8
forall a. Prim a => Int -> Vector a -> Vector a
Primitive.drop Int
2 Vector Word8
bytes

                  Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Vector Word8 -> Int
forall a. Prim a => Vector a -> Int
Primitive.length Vector Word8
postFirstSizeBytes Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
firstSize)
                       (String -> Either String ()
forall a b. a -> Either a b
Left String
"Too few bytes for key")
                  let

                      firstBytes, othersBytes :: Primitive.Vector Word8
                      (Vector Word8
firstBytes, Vector Word8
othersBytes)
                          = Int -> Vector Word8 -> (Vector Word8, Vector Word8)
forall a. Prim a => Int -> Vector a -> (Vector a, Vector a)
Primitive.splitAt Int
firstSize Vector Word8
postFirstSizeBytes

                      first :: Unsliced SerialisedKey
                      !first :: Unsliced SerialisedKey
first = SerialisedKey -> Unsliced SerialisedKey
makeUnslicedKey (Vector Word8 -> SerialisedKey
SerialisedKey' Vector Word8
firstBytes)

                  [Unsliced SerialisedKey]
others <- Vector Word8 -> Either String [Unsliced SerialisedKey]
unslicedLastKeys Vector Word8
othersBytes
                  [Unsliced SerialisedKey] -> Either String [Unsliced SerialisedKey]
forall a. a -> Either String a
forall (m :: * -> *) a. Monad m => a -> m a
return (Unsliced SerialisedKey
first Unsliced SerialisedKey
-> [Unsliced SerialisedKey] -> [Unsliced SerialisedKey]
forall a. a -> [a] -> [a]
: [Unsliced SerialisedKey]
others)