{-# OPTIONS_HADDOCK not-home #-}

module Database.LSMTree.Internal.BloomFilter (
  bloomFilterToLBS,
  bloomFilterFromSBS,
) where

import           Control.Exception (assert)
import           Control.Monad (when)
import qualified Data.BloomFilter as BF
import qualified Data.BloomFilter.BitVec64 as BV64
import qualified Data.BloomFilter.Internal as BF
import qualified Data.ByteString.Builder.Extra as B
import qualified Data.ByteString.Lazy as LBS
import           Data.ByteString.Short (ShortByteString (SBS))
import qualified Data.Primitive as P
import           Data.Primitive.ByteArray (ByteArray (ByteArray))
import qualified Data.Vector.Primitive as VP
import           Data.Word (Word32, Word64, byteSwap32)
import           Database.LSMTree.Internal.BitMath (ceilDiv64, mul8)
import           Database.LSMTree.Internal.ByteString (byteArrayToByteString)
import           Database.LSMTree.Internal.Vector

-- serialising
-----------------------------------------------------------

-- | By writing out the version in host endianness, we also indicate endianness.
-- During deserialisation, we would discover an endianness mismatch.
bloomFilterVersion :: Word32
bloomFilterVersion :: Word32
bloomFilterVersion = Word32
1

bloomFilterToLBS :: BF.Bloom a -> LBS.ByteString
bloomFilterToLBS :: forall a. Bloom a -> ByteString
bloomFilterToLBS b :: Bloom a
b@(BF.Bloom Int
_ Word64
_ BitVec64
bv) =
    Bloom a -> ByteString
forall {h :: * -> *} {a}. Bloom' h a -> ByteString
header Bloom a
b ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> ByteString
LBS.fromStrict (BitVec64 -> ByteString
bitVec BitVec64
bv)
  where
    header :: Bloom' h a -> ByteString
header (BF.Bloom Int
hashesN Word64
len BitVec64
_) =
        -- creates a single 16 byte chunk
        AllocationStrategy -> ByteString -> Builder -> ByteString
B.toLazyByteStringWith (Int -> Int -> AllocationStrategy
B.safeStrategy Int
16 Int
B.smallChunkSize) ByteString
forall a. Monoid a => a
mempty (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$
             Word32 -> Builder
B.word32Host Word32
bloomFilterVersion
          Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Word32 -> Builder
B.word32Host (Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
hashesN)
          Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Word64 -> Builder
B.word64Host Word64
len

    bitVec :: BitVec64 -> ByteString
bitVec (BV64.BV64 (VP.Vector Int
off Int
len ByteArray
ba)) =
        Int -> Int -> ByteArray -> ByteString
byteArrayToByteString (Int -> Int
forall a. Bits a => a -> a
mul8 Int
off) (Int -> Int
forall a. Bits a => a -> a
mul8 Int
len) ByteArray
ba

-- deserialising
-----------------------------------------------------------

-- | Read 'BF.Bloom' from a 'ShortByteString'.
--
-- The input must be 64 bit aligned and exactly contain the serialised bloom
-- filter. In successful case the data portion of bloom filter is /not/ copied
-- (the short bytestring has only 16 bytes of extra data in the header).
--
bloomFilterFromSBS :: ShortByteString -> Either String (BF.Bloom a)
bloomFilterFromSBS :: forall a. ShortByteString -> Either String (Bloom a)
bloomFilterFromSBS (SBS ByteArray#
ba') = do
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteArray -> Int
P.sizeofByteArray ByteArray
ba Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
16) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$ String -> Either String ()
forall a b. a -> Either a b
Left String
"Doesn't contain a header"

    let ver :: Word32
ver = PrimArray Word32 -> Int -> Word32
forall a. Prim a => PrimArray a -> Int -> a
P.indexPrimArray PrimArray Word32
word32pa Int
0
        hsn :: Word32
hsn = PrimArray Word32 -> Int -> Word32
forall a. Prim a => PrimArray a -> Int -> a
P.indexPrimArray PrimArray Word32
word32pa Int
1
        len :: Word64
len = PrimArray Word64 -> Int -> Word64
forall a. Prim a => PrimArray a -> Int -> a
P.indexPrimArray PrimArray Word64
word64pa Int
1 -- length in bits

    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32
ver Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word32
bloomFilterVersion) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$ String -> Either String ()
forall a b. a -> Either a b
Left (String -> Either String ()) -> String -> Either String ()
forall a b. (a -> b) -> a -> b
$
      if Word32 -> Word32
byteSwap32 Word32
ver Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
== Word32
bloomFilterVersion
      then String
"Different byte order"
      else String
"Unsupported version"

    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word64
len Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word64
0) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$ String -> Either String ()
forall a b. a -> Either a b
Left String
"Length is zero"

    -- limit to 2^48 bits
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word64
len Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
>= Word64
0x1_0000_0000_0000) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$ String -> Either String ()
forall a b. a -> Either a b
Left String
"Too large bloomfilter"

    -- we need to round the size of vector up
    let len64 :: Int
len64 = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Word64
forall a. (Bits a, Num a) => a -> a
ceilDiv64 Word64
len)
    -- make sure the bit vector exactly fits into the byte array
    -- (smaller bit vector could work, but wastes memory and should not happen)
    let bytesUsed :: Int
bytesUsed = Int -> Int
forall a. Bits a => a -> a
mul8 (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len64)
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
bytesUsed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> ByteArray -> Int
P.sizeofByteArray ByteArray
ba) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$
      String -> Either String ()
forall a b. a -> Either a b
Left String
"Byte array is too small for components"
    Bool -> Either String () -> Either String ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
bytesUsed Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< ByteArray -> Int
P.sizeofByteArray ByteArray
ba) (Either String () -> Either String ())
-> Either String () -> Either String ()
forall a b. (a -> b) -> a -> b
$
      String -> Either String ()
forall a b. a -> Either a b
Left String
"Byte array is too large for components"

    let vec64 :: VP.Vector Word64
        vec64 :: Vector Word64
vec64 = Int -> Int -> ByteArray -> Vector Word64
forall a. Prim a => Int -> Int -> ByteArray -> Vector a
mkPrimVector Int
2 Int
len64 ByteArray
ba

    let bloom :: Bloom' h a
bloom = Int -> Word64 -> BitVec64 -> Bloom' h a
forall (h :: * -> *) a. Int -> Word64 -> BitVec64 -> Bloom' h a
BF.Bloom (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
hsn) Word64
len (Vector Word64 -> BitVec64
BV64.BV64 Vector Word64
vec64)
    Bool -> Either String (Bloom a) -> Either String (Bloom a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Bloom' Any Any -> Bool
forall (h :: * -> *) a. Bloom' h a -> Bool
BF.bloomInvariant Bloom' Any Any
forall {h :: * -> *} {a}. Bloom' h a
bloom) (Either String (Bloom a) -> Either String (Bloom a))
-> Either String (Bloom a) -> Either String (Bloom a)
forall a b. (a -> b) -> a -> b
$ Bloom a -> Either String (Bloom a)
forall a. a -> Either String a
forall (m :: * -> *) a. Monad m => a -> m a
return Bloom a
forall {h :: * -> *} {a}. Bloom' h a
bloom
  where
    ba :: ByteArray
    ba :: ByteArray
ba = ByteArray# -> ByteArray
ByteArray ByteArray#
ba'

    word32pa :: P.PrimArray Word32
    word32pa :: PrimArray Word32
word32pa = ByteArray# -> PrimArray Word32
forall a. ByteArray# -> PrimArray a
P.PrimArray ByteArray#
ba'

    word64pa :: P.PrimArray Word64
    word64pa :: PrimArray Word64
word64pa = ByteArray# -> PrimArray Word64
forall a. ByteArray# -> PrimArray a
P.PrimArray ByteArray#
ba'