{-# OPTIONS_HADDOCK not-home #-}
{- HLINT ignore "Avoid restricted alias" -}

-- | Chunks of bytes, typically output during incremental index serialisation.
module Database.LSMTree.Internal.Chunk
(
    -- * Chunks
    Chunk (Chunk),
    toByteVector,
    toByteString,

    -- * Balers
    Baler (Baler),
    createBaler,
    feedBaler,
    unsafeEndBaler
)
where

import           Prelude hiding (length)

import           Control.Exception (assert)
import           Control.Monad.ST.Strict (ST)
import           Data.ByteString (ByteString)
import           Data.List (scanl')
import           Data.Primitive.PrimVar (PrimVar, newPrimVar, readPrimVar,
                     writePrimVar)
import           Data.Vector.Primitive (Vector (Vector), length, unsafeCopy,
                     unsafeFreeze)
import           Data.Vector.Primitive.Mutable (MVector)
import qualified Data.Vector.Primitive.Mutable as Mutable (drop, length, slice,
                     take, unsafeCopy, unsafeNew)
import           Data.Word (Word8)
import           Database.LSMTree.Internal.ByteString (byteArrayToByteString)

-- * Chunks

-- | A chunk of bytes, typically output during incremental index serialisation.
newtype Chunk = Chunk (Vector Word8) deriving stock (Chunk -> Chunk -> Bool
(Chunk -> Chunk -> Bool) -> (Chunk -> Chunk -> Bool) -> Eq Chunk
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Chunk -> Chunk -> Bool
== :: Chunk -> Chunk -> Bool
$c/= :: Chunk -> Chunk -> Bool
/= :: Chunk -> Chunk -> Bool
Eq, Int -> Chunk -> ShowS
[Chunk] -> ShowS
Chunk -> String
(Int -> Chunk -> ShowS)
-> (Chunk -> String) -> ([Chunk] -> ShowS) -> Show Chunk
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Chunk -> ShowS
showsPrec :: Int -> Chunk -> ShowS
$cshow :: Chunk -> String
show :: Chunk -> String
$cshowList :: [Chunk] -> ShowS
showList :: [Chunk] -> ShowS
Show)

-- | Yields the contents of a chunk as a byte vector.
toByteVector :: Chunk -> Vector Word8
toByteVector :: Chunk -> Vector Word8
toByteVector (Chunk Vector Word8
byteVector) = Vector Word8
byteVector

-- | Yields the contents of a chunk as a (strict) byte string.
toByteString :: Chunk -> ByteString
toByteString :: Chunk -> ByteString
toByteString (Chunk (Vector Int
vecOffset Int
vecLength ByteArray
byteArray))
    = Int -> Int -> ByteArray -> ByteString
byteArrayToByteString Int
vecOffset Int
vecLength ByteArray
byteArray

-- * Balers

{-|
    An object that receives blocks of bytes and repackages them into chunks such
    that all chunks except for a possible remnant chunk at the end are of at
    least a given minimum size.
-}
data Baler s = Baler
                   !(MVector s Word8) -- Buffer storing queued bytes
                   !(PrimVar s Int)   -- Reference to the number of queued bytes

-- | Creates a new baler.
createBaler :: Int            -- ^ Minimum chunk size in bytes
            -> ST s (Baler s) -- ^ Creation of the baler
createBaler :: forall s. Int -> ST s (Baler s)
createBaler Int
minChunkSize = Bool -> ST s (Baler s) -> ST s (Baler s)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
minChunkSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)              (ST s (Baler s) -> ST s (Baler s))
-> ST s (Baler s) -> ST s (Baler s)
forall a b. (a -> b) -> a -> b
$
                           MVector s Word8 -> PrimVar s Int -> Baler s
forall s. MVector s Word8 -> PrimVar s Int -> Baler s
Baler                                 (MVector s Word8 -> PrimVar s Int -> Baler s)
-> ST s (MVector s Word8) -> ST s (PrimVar s Int -> Baler s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                           Int -> ST s (MVector (PrimState (ST s)) Word8)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
Int -> m (MVector (PrimState m) a)
Mutable.unsafeNew (Int -> Int
forall a. Enum a => a -> a
pred Int
minChunkSize) ST s (PrimVar s Int -> Baler s)
-> ST s (PrimVar s Int) -> ST s (Baler s)
forall a b. ST s (a -> b) -> ST s a -> ST s b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*>
                           Int -> ST s (PrimVar (PrimState (ST s)) Int)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
a -> m (PrimVar (PrimState m) a)
newPrimVar Int
0

{-|
    Feeds a baler blocks of bytes.

    Bytes received by a baler are generally queued for later output, but if
    feeding new bytes makes the accumulated content exceed the minimum chunk
    size then a chunk containing all the accumulated content is output.
-}
feedBaler :: forall s . [Vector Word8] -> Baler s -> ST s (Maybe Chunk)
feedBaler :: forall s. [Vector Word8] -> Baler s -> ST s (Maybe Chunk)
feedBaler [Vector Word8]
blocks (Baler MVector s Word8
buffer PrimVar s Int
remnantSizeRef) = do
    Int
remnantSize <- PrimVar (PrimState (ST s)) Int -> ST s Int
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> m a
readPrimVar PrimVar s Int
PrimVar (PrimState (ST s)) Int
remnantSizeRef
    let

        inputSize :: Int
        !inputSize :: Int
inputSize = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ((Vector Word8 -> Int) -> [Vector Word8] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Vector Word8 -> Int
forall a. Prim a => Vector a -> Int
length [Vector Word8]
blocks)

        totalSize :: Int
        !totalSize :: Int
totalSize = Int
remnantSize Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
inputSize

    if Int
totalSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= MVector s Word8 -> Int
forall a s. Prim a => MVector s a -> Int
Mutable.length MVector s Word8
buffer
        then do
                 MVector s Word8 -> ST s ()
unsafeCopyBlocks (Int -> MVector s Word8 -> MVector s Word8
forall a s. Prim a => Int -> MVector s a -> MVector s a
Mutable.drop Int
remnantSize MVector s Word8
buffer)
                 PrimVar (PrimState (ST s)) Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> a -> m ()
writePrimVar PrimVar s Int
PrimVar (PrimState (ST s)) Int
remnantSizeRef Int
totalSize
                 Maybe Chunk -> ST s (Maybe Chunk)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Chunk
forall a. Maybe a
Nothing
        else do
                 MVector s Word8
protoChunk <- Int -> ST s (MVector (PrimState (ST s)) Word8)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
Int -> m (MVector (PrimState m) a)
Mutable.unsafeNew Int
totalSize
                 MVector (PrimState (ST s)) Word8
-> MVector (PrimState (ST s)) Word8 -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MVector (PrimState m) a -> MVector (PrimState m) a -> m ()
Mutable.unsafeCopy (Int -> MVector s Word8 -> MVector s Word8
forall a s. Prim a => Int -> MVector s a -> MVector s a
Mutable.take Int
remnantSize MVector s Word8
protoChunk)
                                    (Int -> MVector s Word8 -> MVector s Word8
forall a s. Prim a => Int -> MVector s a -> MVector s a
Mutable.take Int
remnantSize MVector s Word8
buffer)
                 MVector s Word8 -> ST s ()
unsafeCopyBlocks (Int -> MVector s Word8 -> MVector s Word8
forall a s. Prim a => Int -> MVector s a -> MVector s a
Mutable.drop Int
remnantSize MVector s Word8
protoChunk)
                 PrimVar (PrimState (ST s)) Int -> Int -> ST s ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> a -> m ()
writePrimVar PrimVar s Int
PrimVar (PrimState (ST s)) Int
remnantSizeRef Int
0
                 Chunk
chunk <- Vector Word8 -> Chunk
Chunk (Vector Word8 -> Chunk) -> ST s (Vector Word8) -> ST s Chunk
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVector (PrimState (ST s)) Word8 -> ST s (Vector Word8)
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
unsafeFreeze MVector s Word8
MVector (PrimState (ST s)) Word8
protoChunk
                 Maybe Chunk -> ST s (Maybe Chunk)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Chunk -> Maybe Chunk
forall a. a -> Maybe a
Just Chunk
chunk)
    where

    unsafeCopyBlocks :: MVector s Word8 -> ST s ()
    unsafeCopyBlocks :: MVector s Word8 -> ST s ()
unsafeCopyBlocks MVector s Word8
vec
        = [ST s ()] -> ST s ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ ([ST s ()] -> ST s ()) -> [ST s ()] -> ST s ()
forall a b. (a -> b) -> a -> b
$
          (Int -> Int -> Vector Word8 -> ST s ())
-> [Int] -> [Int] -> [Vector Word8] -> [ST s ()]
forall a b c d. (a -> b -> c -> d) -> [a] -> [b] -> [c] -> [d]
zipWith3 (\ Int
start Int
size Vector Word8
block -> MVector (PrimState (ST s)) Word8 -> Vector Word8 -> ST s ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MVector (PrimState m) a -> Vector a -> m ()
unsafeCopy
                                              (Int -> Int -> MVector s Word8 -> MVector s Word8
forall a s. Prim a => Int -> Int -> MVector s a -> MVector s a
Mutable.slice Int
start Int
size MVector s Word8
vec)
                                              Vector Word8
block)
                   ((Int -> Int -> Int) -> Int -> [Int] -> [Int]
forall b a. (b -> a -> b) -> b -> [a] -> [b]
scanl' Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) Int
0 [Int]
blockSizes)
                   [Int]
blockSizes
                   [Vector Word8]
blocks
        where

        blockSizes :: [Int]
        blockSizes :: [Int]
blockSizes = (Vector Word8 -> Int) -> [Vector Word8] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map Vector Word8 -> Int
forall a. Prim a => Vector a -> Int
length [Vector Word8]
blocks

{-|
    Returns the bytes still queued in a baler, if any, thereby invalidating the
    baler. Executing @unsafeEndBaler baler@ is only safe when @baler@ is not
    used afterwards.
-}
unsafeEndBaler :: forall s . Baler s -> ST s (Maybe Chunk)
unsafeEndBaler :: forall s. Baler s -> ST s (Maybe Chunk)
unsafeEndBaler (Baler MVector s Word8
buffer PrimVar s Int
remnantSizeRef) = do
    Int
remnantSize <- PrimVar (PrimState (ST s)) Int -> ST s Int
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> m a
readPrimVar PrimVar s Int
PrimVar (PrimState (ST s)) Int
remnantSizeRef
    if Int
remnantSize Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
        then Maybe Chunk -> ST s (Maybe Chunk)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Chunk
forall a. Maybe a
Nothing
        else do
                 let

                     protoChunk :: MVector s Word8
                     !protoChunk :: MVector s Word8
protoChunk = Int -> MVector s Word8 -> MVector s Word8
forall a s. Prim a => Int -> MVector s a -> MVector s a
Mutable.take Int
remnantSize MVector s Word8
buffer

                 Chunk
chunk <- Vector Word8 -> Chunk
Chunk (Vector Word8 -> Chunk) -> ST s (Vector Word8) -> ST s Chunk
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVector (PrimState (ST s)) Word8 -> ST s (Vector Word8)
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MVector (PrimState m) a -> m (Vector a)
unsafeFreeze MVector s Word8
MVector (PrimState (ST s)) Word8
protoChunk
                 Maybe Chunk -> ST s (Maybe Chunk)
forall a. a -> ST s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Chunk -> Maybe Chunk
forall a. a -> Maybe a
Just Chunk
chunk)