{-# LANGUAGE BangPatterns   #-}
{-# LANGUAGE DeriveFunctor  #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# OPTIONS_HADDOCK not-home #-}

module Database.LSMTree.Internal.RawOverflowPage (
    RawOverflowPage (..),
    makeRawOverflowPage,
    unsafeMakeRawOverflowPage,
    rawOverflowPageRawBytes,
    rawOverflowPageToByteString,
    rawBytesToOverflowPages,
    pinnedByteArrayToOverflowPages,
    unpinnedByteArrayToOverflowPages,
) where

import           Control.DeepSeq (NFData (rnf))
import           Control.Monad (when)
import           Data.ByteString (ByteString)
import           Data.Primitive.ByteArray (ByteArray (..), copyByteArray,
                     fillByteArray, isByteArrayPinned, newPinnedByteArray,
                     runByteArray)
import qualified Data.Vector.Primitive as VP
import           Database.LSMTree.Internal.Assertions
import           Database.LSMTree.Internal.BitMath (roundUpToPageSize)
import           Database.LSMTree.Internal.RawBytes (RawBytes (..))
import qualified Database.LSMTree.Internal.RawBytes as RB

-------------------------------------------------------------------------------
-- RawOverflowPage type
-------------------------------------------------------------------------------

-- | When a key\/op pair is too large to fit in a single disk page, the
-- representation is split into a normal page, and one or more overflow pages.
-- The normal 'RawPage' follows the run page format, and contains the key,
-- optional blob reference and a prefix of the value, while the overflow pages
-- contain the suffix of a large value that didn't fit in the normal page.
--
-- Each overflow page is the same size as normal pages (currently 4096 only).
--
data RawOverflowPage = RawOverflowPage
                         !Int        -- ^ offset in Word8s.
                         !ByteArray
  deriving stock (Int -> RawOverflowPage -> ShowS
[RawOverflowPage] -> ShowS
RawOverflowPage -> String
(Int -> RawOverflowPage -> ShowS)
-> (RawOverflowPage -> String)
-> ([RawOverflowPage] -> ShowS)
-> Show RawOverflowPage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RawOverflowPage -> ShowS
showsPrec :: Int -> RawOverflowPage -> ShowS
$cshow :: RawOverflowPage -> String
show :: RawOverflowPage -> String
$cshowList :: [RawOverflowPage] -> ShowS
showList :: [RawOverflowPage] -> ShowS
Show)

-- | This invariant is the same as for 'RawPage', but there is no alignment
-- constraint. This is for two reasons: 1. we don't need alignment, because
-- the page has no structure that we need to read with aligned memory
-- operations; 2. we don't want alignment because we want to convert the suffix
-- of serialised values (as 'RawBytes') into a 'RawOverflowPage' without
-- copying, and a suffix can start at an arbitrary offset.
--
invariant :: RawOverflowPage -> Bool
invariant :: RawOverflowPage -> Bool
invariant (RawOverflowPage Int
off ByteArray
ba) = [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and
    [ Int -> Int -> ByteArray -> Bool
isValidSlice Int
off Int
4096 ByteArray
ba    -- valid bytearray slice for length 4096
    , ByteArray -> Bool
isByteArrayPinned ByteArray
ba        -- bytearray must be pinned (for I/O)
    ]

instance NFData RawOverflowPage where
  rnf :: RawOverflowPage -> ()
rnf (RawOverflowPage Int
_ ByteArray
_) = ()

-- | This instance assumes pages are 4096 bytes in size
instance Eq RawOverflowPage where
    RawOverflowPage
r1 == :: RawOverflowPage -> RawOverflowPage -> Bool
== RawOverflowPage
r2 = RawOverflowPage -> RawBytes
rawOverflowPageRawBytes RawOverflowPage
r1 RawBytes -> RawBytes -> Bool
forall a. Eq a => a -> a -> Bool
== RawOverflowPage -> RawBytes
rawOverflowPageRawBytes RawOverflowPage
r2

rawOverflowPageRawBytes :: RawOverflowPage -> RawBytes
rawOverflowPageRawBytes :: RawOverflowPage -> RawBytes
rawOverflowPageRawBytes (RawOverflowPage Int
off ByteArray
ba) =
    Int -> Int -> ByteArray -> RawBytes
RB.fromByteArray Int
off Int
4096 ByteArray
ba

-- | \( O(1) \) since we can avoid copying the pinned byte array.
rawOverflowPageToByteString :: RawOverflowPage -> ByteString
rawOverflowPageToByteString :: RawOverflowPage -> ByteString
rawOverflowPageToByteString =
    HasCallStack => RawBytes -> ByteString
RawBytes -> ByteString
RB.unsafePinnedToByteString (RawBytes -> ByteString)
-> (RawOverflowPage -> RawBytes) -> RawOverflowPage -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RawOverflowPage -> RawBytes
rawOverflowPageRawBytes

-- | Create a 'RawOverflowPage'.
--
-- The length must be 4096 or less.
--
-- This function will copy data if the byte array is not pinned, or the length
-- is strictly less than 4096.
--
makeRawOverflowPage ::
       ByteArray  -- ^ bytearray
    -> Int        -- ^ offset in bytes into the bytearray
    -> Int        -- ^ length in bytes, must be @>= 0 && <= 4096@
    -> RawOverflowPage
makeRawOverflowPage :: ByteArray -> Int -> Int -> RawOverflowPage
makeRawOverflowPage ByteArray
ba Int
off Int
len
    | Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
4096
    , let page :: RawOverflowPage
page = Int -> ByteArray -> RawOverflowPage
RawOverflowPage Int
off ByteArray
ba
    , RawOverflowPage -> Bool
invariant RawOverflowPage
page
    = RawOverflowPage
page

    | Bool
otherwise
    = ByteArray -> Int -> Int -> RawOverflowPage
makeRawOverflowPageCopy ByteArray
ba Int
off Int
len

makeRawOverflowPageCopy ::
       ByteArray  -- ^ bytearray
    -> Int        -- ^ offset in bytes into the bytearray
    -> Int        -- ^ length in bytes
    -> RawOverflowPage
makeRawOverflowPageCopy :: ByteArray -> Int -> Int -> RawOverflowPage
makeRawOverflowPageCopy ByteArray
ba Int
off Int
len =
    Bool -> RawOverflowPage -> RawOverflowPage
forall a. HasCallStack => Bool -> a -> a
assert (Int -> Int -> ByteArray -> Bool
isValidSlice Int
off Int
len ByteArray
ba Bool -> Bool -> Bool
&& Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
4096) (RawOverflowPage -> RawOverflowPage)
-> RawOverflowPage -> RawOverflowPage
forall a b. (a -> b) -> a -> b
$
    (\RawOverflowPage
page -> Bool -> RawOverflowPage -> RawOverflowPage
forall a. HasCallStack => Bool -> a -> a
assert (RawOverflowPage -> Bool
invariant RawOverflowPage
page) RawOverflowPage
page) (RawOverflowPage -> RawOverflowPage)
-> RawOverflowPage -> RawOverflowPage
forall a b. (a -> b) -> a -> b
$
    Int -> ByteArray -> RawOverflowPage
RawOverflowPage Int
0 (ByteArray -> RawOverflowPage) -> ByteArray -> RawOverflowPage
forall a b. (a -> b) -> a -> b
$ (forall s. ST s (MutableByteArray s)) -> ByteArray
runByteArray ((forall s. ST s (MutableByteArray s)) -> ByteArray)
-> (forall s. ST s (MutableByteArray s)) -> ByteArray
forall a b. (a -> b) -> a -> b
$ do
      MutableByteArray s
mba <- Int -> ST s (MutableByteArray (PrimState (ST s)))
forall (m :: * -> *).
PrimMonad m =>
Int -> m (MutableByteArray (PrimState m))
newPinnedByteArray Int
4096
      let suffixlen :: Int
suffixlen = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
4096 Int
len -- would only do anything with assertions off
      MutableByteArray (PrimState (ST s))
-> Int -> ByteArray -> Int -> Int -> ST s ()
forall (m :: * -> *).
PrimMonad m =>
MutableByteArray (PrimState m)
-> Int -> ByteArray -> Int -> Int -> m ()
copyByteArray MutableByteArray s
MutableByteArray (PrimState (ST s))
mba Int
0 ByteArray
ba Int
off Int
suffixlen
      Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
suffixlen Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
4096) (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ MutableByteArray (PrimState (ST s))
-> Int -> Int -> Word8 -> ST s ()
forall (m :: * -> *).
PrimMonad m =>
MutableByteArray (PrimState m) -> Int -> Int -> Word8 -> m ()
fillByteArray MutableByteArray s
MutableByteArray (PrimState (ST s))
mba Int
suffixlen (Int
4096Int -> Int -> Int
forall a. Num a => a -> a -> a
-Int
suffixlen) Word8
0
      MutableByteArray s -> ST s (MutableByteArray s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return MutableByteArray s
mba

-- | Create a 'RawOverflowPage' without copying. The byte array and offset must
-- satisfy the invariant for 'RawOverflowPage'.
--
unsafeMakeRawOverflowPage ::
       ByteArray  -- ^ bytearray, must be pinned and contain 4096 bytes (after offset)
    -> Int        -- ^ offset in bytes
    -> RawOverflowPage
unsafeMakeRawOverflowPage :: ByteArray -> Int -> RawOverflowPage
unsafeMakeRawOverflowPage ByteArray
ba Int
off =
    Bool -> RawOverflowPage -> RawOverflowPage
forall a. HasCallStack => Bool -> a -> a
assert (RawOverflowPage -> Bool
invariant RawOverflowPage
page) RawOverflowPage
page
  where
    page :: RawOverflowPage
page = Int -> ByteArray -> RawOverflowPage
RawOverflowPage Int
off ByteArray
ba

-- | Convert 'RawBytes' representing the \"overflow\" part of a value into one
-- or more 'RawOverflowPage's.
--
-- This will avoid copying where possible.
--
rawBytesToOverflowPages :: RawBytes -> [RawOverflowPage]
rawBytesToOverflowPages :: RawBytes -> [RawOverflowPage]
rawBytesToOverflowPages (RawBytes (VP.Vector Int
off Int
len ByteArray
ba))
  | ByteArray -> Bool
isByteArrayPinned ByteArray
ba
  = Int -> Int -> ByteArray -> [RawOverflowPage]
pinnedByteArrayToOverflowPages Int
off Int
len ByteArray
ba
  | Bool
otherwise
  = Int -> Int -> ByteArray -> [RawOverflowPage]
unpinnedByteArrayToOverflowPages Int
off Int
len ByteArray
ba

pinnedByteArrayToOverflowPages :: Int -> Int -> ByteArray -> [RawOverflowPage]
pinnedByteArrayToOverflowPages :: Int -> Int -> ByteArray -> [RawOverflowPage]
pinnedByteArrayToOverflowPages !Int
off !Int
len !ByteArray
ba
      | Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
4096
      , let !page :: RawOverflowPage
page = ByteArray -> Int -> RawOverflowPage
unsafeMakeRawOverflowPage ByteArray
ba Int
off
      = RawOverflowPage
page RawOverflowPage -> [RawOverflowPage] -> [RawOverflowPage]
forall a. a -> [a] -> [a]
: Int -> Int -> ByteArray -> [RawOverflowPage]
pinnedByteArrayToOverflowPages (Int
offInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
4096) (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
4096) ByteArray
ba

      | Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
      = []

      | Bool
otherwise -- > 0 && < 4096
                  -- have to copy the partial last page
      , let !page :: RawOverflowPage
page = ByteArray -> Int -> Int -> RawOverflowPage
makeRawOverflowPageCopy ByteArray
ba Int
off Int
len
      = RawOverflowPage
page RawOverflowPage -> [RawOverflowPage] -> [RawOverflowPage]
forall a. a -> [a] -> [a]
: []

-- | Not pinned, in principle shouldn't happen much because if the value
-- is big enough to overflow then it's big enough to be pinned.
-- It is possible however if a page has a huge key and a small value.
--
-- Unfortunately, with GHC versions 9.6.x we also get this because the meaning
-- of pinned has changed. Sigh.
-- See <https://gitlab.haskell.org/ghc/ghc/-/issues/22255>
--
unpinnedByteArrayToOverflowPages :: Int -> Int -> ByteArray -> [RawOverflowPage]
unpinnedByteArrayToOverflowPages :: Int -> Int -> ByteArray -> [RawOverflowPage]
unpinnedByteArrayToOverflowPages !Int
off !Int
len !ByteArray
ba =
    let !lenPages :: Int
lenPages = Int -> Int
forall a. (Bits a, Num a) => a -> a
roundUpToPageSize Int
len
        ba' :: ByteArray
ba'       = (forall s. ST s (MutableByteArray s)) -> ByteArray
runByteArray ((forall s. ST s (MutableByteArray s)) -> ByteArray)
-> (forall s. ST s (MutableByteArray s)) -> ByteArray
forall a b. (a -> b) -> a -> b
$ do
                      MutableByteArray s
mba <- Int -> ST s (MutableByteArray (PrimState (ST s)))
forall (m :: * -> *).
PrimMonad m =>
Int -> m (MutableByteArray (PrimState m))
newPinnedByteArray Int
lenPages
                      MutableByteArray (PrimState (ST s))
-> Int -> ByteArray -> Int -> Int -> ST s ()
forall (m :: * -> *).
PrimMonad m =>
MutableByteArray (PrimState m)
-> Int -> ByteArray -> Int -> Int -> m ()
copyByteArray MutableByteArray s
MutableByteArray (PrimState (ST s))
mba Int
0 ByteArray
ba Int
off Int
len
                      MutableByteArray (PrimState (ST s))
-> Int -> Int -> Word8 -> ST s ()
forall (m :: * -> *).
PrimMonad m =>
MutableByteArray (PrimState m) -> Int -> Int -> Word8 -> m ()
fillByteArray MutableByteArray s
MutableByteArray (PrimState (ST s))
mba Int
len (Int
lenPagesInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
len) Word8
0
                      MutableByteArray s -> ST s (MutableByteArray s)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return MutableByteArray s
mba
        pages :: [RawOverflowPage]
pages     = Int -> Int -> ByteArray -> [RawOverflowPage]
pinnedByteArrayToOverflowPages Int
0 Int
lenPages ByteArray
ba'
        -- We've arranged to do the conversion without any extra copying,
        -- so assert that we got that right:
     in Bool -> [RawOverflowPage] -> [RawOverflowPage]
forall a. HasCallStack => Bool -> a -> a
assert ((RawOverflowPage -> Bool) -> [RawOverflowPage] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (ByteArray -> RawOverflowPage -> Bool
isSliceNotCopy ByteArray
ba') [RawOverflowPage]
pages)
        [RawOverflowPage]
pages
  where
    isSliceNotCopy :: ByteArray -> RawOverflowPage -> Bool
    isSliceNotCopy :: ByteArray -> RawOverflowPage -> Bool
isSliceNotCopy ByteArray
ba1 (RawOverflowPage Int
_ ByteArray
ba2) = ByteArray -> ByteArray -> Bool
sameByteArray ByteArray
ba1 ByteArray
ba2