{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wno-partial-fields #-}
{-# OPTIONS_HADDOCK not-home #-}
{- HLINT ignore "Use record patterns" -}

-- | An on-disk store for blobs for the write buffer.
--
-- For table inserts with blobs, the blob get written out immediately to a
-- file, while the rest of the 'Entry' goes into the 'WriteBuffer'. The
-- 'WriteBufferBlobs' manages the storage of the blobs.
--
-- A single write buffer blob file can be shared between multiple tables. As a
-- consequence, the lifetime of the 'WriteBufferBlobs' must be managed using
-- 'new', and with the 'Ref' API: 'releaseRef' and 'dupRef'. When a table is
-- duplicated, the new table needs its own reference, so use 'dupRef' upon
-- duplication.
--
-- Blobs are copied from the write buffer blob file when the write buffer is
-- flushed to make a run. This is needed since the blob file is shared and so
-- not stable by the time one table wants to flush it.
--
-- Not all tables need a blob file so we defer opening the file until it
-- is needed.
--
module Database.LSMTree.Internal.WriteBufferBlobs (
    WriteBufferBlobs (..),
    new,
    open,
    addBlob,
    mkRawBlobRef,
    mkWeakBlobRef,
    -- * For tests
    FilePointer (..)
  ) where

import           Control.DeepSeq (NFData (..))
import           Control.Monad (void)
import           Control.Monad.Class.MonadThrow
import           Control.Monad.Primitive (PrimMonad, PrimState)
import           Control.RefCount
import           Data.Primitive.PrimVar as P
import           Data.Word (Word64)
import           Database.LSMTree.Internal.BlobFile
import qualified Database.LSMTree.Internal.BlobFile as BlobFile
import           Database.LSMTree.Internal.BlobRef (RawBlobRef (..),
                     WeakBlobRef (..))
import           Database.LSMTree.Internal.Serialise
import qualified System.FS.API as FS
import           System.FS.API (HasFS)

-- | A single 'WriteBufferBlobs' may be shared between multiple tables.
-- As a consequence of being shared, the management of the shared state has to
-- be quite careful.
--
-- In particular there is the blob file itself. We may have to write to this
-- blob file from multiple threads on behalf of independent tables.
-- The offset at which we write is thus shared mutable state. Our strategy for
-- the write offset is to explicitly track it (since we need to know the offset
-- to return correct 'BlobSpan's) and then to not use the file's own file
-- pointer. We do this by always writing at specific file offsets rather than
-- writing at the open file's normal file pointer. We use a 'PrimVar' with
-- atomic operations to manage the file offset.
--
-- A consequence of the blob file being shared between the write buffers of
-- many tables is that the blobs in the file will not all belong to one
-- table. The write buffer blob file is unsuitable to use as-is as the
-- blob file for a run when the write buffer is flushed. The run blob file must
-- be immutable and with a known CRC. Whereas because the write buffer blob
-- file is shared, it can still be appended to via inserts in one table
-- while another is trying to flush the write buffer. So there is no stable CRC
-- for the whole file (as required by the snapshot format). Further more we
-- cannot even incrementally calculate the blob file CRC without additional
-- expensive serialisation. To solve this we follow the design that the open
-- file handle for the blob file is only shared between multiple write buffers,
-- and is /not/ shared with the runs once flushed. This separates the lifetimes
-- of the files. Correspondingly, the reference counter is only for
-- tracking the lifetime of the read\/write mode file handle.
--
-- One concern with sharing blob files and the open blob file handle between
-- multiple write buffers is: can we guarantee that the blob file is eventually
-- closed?
--
-- A problematic example would be, starting from a root handle and then
-- repeatedly: duplicating; inserting (with blobs) into the duplicate; and then
-- closing it. This would use only a fixed number of tables at once, but
-- would keep inserting into the same the write buffer blob file. This could be
-- done indefinitely.
--
-- On the other hand, provided that there's a bound on the number of duplicates
-- that are created from any point, and each table is eventually closed,
-- then each write buffer blob file will eventually be closed.
--
-- The latter seems like the more realistic use case, and so the design here is
-- probably reasonable.
--
-- If not, an entirely different approach would be to manage blobs across all
-- runs (and the write buffer) differently: avoiding copying when blobs are
-- merged and using some kind of GC algorithm to recover space for blobs that
-- are not longer needed. There are LSM algorithms that do this for values
-- (i.e. copying keys only during merge and referring to values managed in a
-- separate disk heap), so the same could be applied to blobs.
--
data WriteBufferBlobs m h =
     WriteBufferBlobs {

       -- | The blob file
       --
       -- INVARIANT: the file may contain garbage bytes, but no blob reference
       -- ('RawBlobRef', 'WeakBlobRef', or 'StrongBlobRef) will reference these
       -- bytes.
      forall (m :: * -> *) h. WriteBufferBlobs m h -> Ref (BlobFile m h)
blobFile           :: !(Ref (BlobFile m h))

      -- | The manually tracked file pointer.
      --
      -- INVARIANT: the file pointer points to a file offset at or beyond the
      -- file size.
    , forall (m :: * -> *) h. WriteBufferBlobs m h -> FilePointer m
blobFilePointer    :: !(FilePointer m)

      -- The 'WriteBufferBlobs' is a shared reference-counted object type
    , forall (m :: * -> *) h. WriteBufferBlobs m h -> RefCounter m
writeBufRefCounter :: !(RefCounter m)
    }

instance NFData h => NFData (WriteBufferBlobs m h) where
  rnf :: WriteBufferBlobs m h -> ()
rnf (WriteBufferBlobs Ref (BlobFile m h)
a FilePointer m
b RefCounter m
c) = Ref (BlobFile m h) -> ()
forall a. NFData a => a -> ()
rnf Ref (BlobFile m h)
a () -> () -> ()
forall a b. a -> b -> b
`seq` FilePointer m -> ()
forall a. NFData a => a -> ()
rnf FilePointer m
b () -> () -> ()
forall a b. a -> b -> b
`seq` RefCounter m -> ()
forall a. NFData a => a -> ()
rnf RefCounter m
c

instance RefCounted m (WriteBufferBlobs m h) where
  getRefCounter :: WriteBufferBlobs m h -> RefCounter m
getRefCounter = WriteBufferBlobs m h -> RefCounter m
forall (m :: * -> *) h. WriteBufferBlobs m h -> RefCounter m
writeBufRefCounter

{-# SPECIALISE new :: HasFS IO h -> FS.FsPath -> IO (Ref (WriteBufferBlobs IO h)) #-}
-- | Create a new 'WriteBufferBlobs' with a new file.
--
-- REF: the resulting reference must be released once it is no longer used.
--
-- ASYNC: this should be called with asynchronous exceptions masked because it
-- allocates/creates resources.
new ::
     (PrimMonad m, MonadMask m)
  => HasFS m h
  -> FS.FsPath
  -> m (Ref (WriteBufferBlobs m h))
new :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
new HasFS m h
fs FsPath
blobFileName = HasFS m h
-> FsPath -> AllowExisting -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
HasFS m h
-> FsPath -> AllowExisting -> m (Ref (WriteBufferBlobs m h))
open HasFS m h
fs FsPath
blobFileName AllowExisting
FS.MustBeNew

{-# SPECIALISE open :: HasFS IO h -> FS.FsPath -> FS.AllowExisting -> IO (Ref (WriteBufferBlobs IO h)) #-}
-- | Open a `WriteBufferBlobs` file and sets the file pointer to the end of the file.
--
-- REF: the resulting reference must be released once it is no longer used.
--
-- ASYNC: this should be called with asynchronous exceptions masked because it
-- allocates/creates resources.
open ::
     (PrimMonad m, MonadMask m)
  => HasFS m h
  -> FS.FsPath
  -> FS.AllowExisting
  -> m (Ref (WriteBufferBlobs m h))
open :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
HasFS m h
-> FsPath -> AllowExisting -> m (Ref (WriteBufferBlobs m h))
open HasFS m h
fs FsPath
blobFileName AllowExisting
blobFileAllowExisting = do
    -- Must use read/write mode because we write blobs when adding, but
    -- we can also be asked to retrieve blobs at any time.
    m (Ref (BlobFile m h))
-> (Ref (BlobFile m h) -> m ())
-> (Ref (BlobFile m h) -> m (Ref (WriteBufferBlobs m h)))
-> m (Ref (WriteBufferBlobs m h))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadCatch m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError
      (HasFS m h -> FsPath -> OpenMode -> m (Ref (BlobFile m h))
forall (m :: * -> *) h.
(PrimMonad m, MonadCatch m, HasCallStack) =>
HasFS m h -> FsPath -> OpenMode -> m (Ref (BlobFile m h))
openBlobFile HasFS m h
fs FsPath
blobFileName (AllowExisting -> OpenMode
FS.ReadWriteMode AllowExisting
blobFileAllowExisting))
      Ref (BlobFile m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
      (HasFS m h -> Ref (BlobFile m h) -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
HasFS m h -> Ref (BlobFile m h) -> m (Ref (WriteBufferBlobs m h))
fromBlobFile HasFS m h
fs)

{-# SPECIALISE fromBlobFile :: HasFS IO h -> Ref (BlobFile IO h) -> IO (Ref (WriteBufferBlobs IO h)) #-}
-- | Make a `WriteBufferBlobs` from a `BlobFile` and set the file pointer to the
-- end of the file.
--
-- REF: the resulting reference must be released once it is no longer used.
--
-- ASYNC: this should be called with asynchronous exceptions masked because it
-- allocates/creates resources.
fromBlobFile ::
     (PrimMonad m, MonadMask m)
  => HasFS m h
  -> Ref (BlobFile m h)
  -> m (Ref (WriteBufferBlobs m h))
fromBlobFile :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
HasFS m h -> Ref (BlobFile m h) -> m (Ref (WriteBufferBlobs m h))
fromBlobFile HasFS m h
fs Ref (BlobFile m h)
blobFile = do
    FilePointer m
blobFilePointer <- m (FilePointer m)
forall (m :: * -> *). PrimMonad m => m (FilePointer m)
newFilePointer
    -- Set the blob file pointer to the end of the file
    Word64
blobFileSize <- Ref (BlobFile m h) -> (BlobFile m h -> m Word64) -> m Word64
forall (m :: * -> *) obj a.
(PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> (obj -> m a) -> m a
withRef Ref (BlobFile m h)
blobFile ((BlobFile m h -> m Word64) -> m Word64)
-> (BlobFile m h -> m Word64) -> m Word64
forall a b. (a -> b) -> a -> b
$ HasFS m h -> HasCallStack => Handle h -> m Word64
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Handle h -> m Word64
FS.hGetSize HasFS m h
fs (Handle h -> m Word64)
-> (BlobFile m h -> Handle h) -> BlobFile m h -> m Word64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlobFile m h -> Handle h
forall (m :: * -> *) h. BlobFile m h -> Handle h
blobFileHandle
    m Word64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Word64 -> m ()) -> (Word64 -> m Word64) -> Word64 -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePointer m -> Int -> m Word64
forall (m :: * -> *).
PrimMonad m =>
FilePointer m -> Int -> m Word64
updateFilePointer FilePointer m
blobFilePointer (Int -> m Word64) -> (Word64 -> Int) -> Word64 -> m Word64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> m ()) -> Word64 -> m ()
forall a b. (a -> b) -> a -> b
$ Word64
blobFileSize
    m ()
-> (RefCounter m -> WriteBufferBlobs m h)
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, HasCallStack) =>
m () -> (RefCounter m -> obj) -> m (Ref obj)
newRef (Ref (BlobFile m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (BlobFile m h)
blobFile) ((RefCounter m -> WriteBufferBlobs m h)
 -> m (Ref (WriteBufferBlobs m h)))
-> (RefCounter m -> WriteBufferBlobs m h)
-> m (Ref (WriteBufferBlobs m h))
forall a b. (a -> b) -> a -> b
$ \RefCounter m
writeBufRefCounter ->
      WriteBufferBlobs {
        Ref (BlobFile m h)
blobFile :: Ref (BlobFile m h)
blobFile :: Ref (BlobFile m h)
blobFile,
        FilePointer m
blobFilePointer :: FilePointer m
blobFilePointer :: FilePointer m
blobFilePointer,
        RefCounter m
writeBufRefCounter :: RefCounter m
writeBufRefCounter :: RefCounter m
writeBufRefCounter
      }

{-# SPECIALISE addBlob :: HasFS IO h -> Ref (WriteBufferBlobs IO h) -> SerialisedBlob -> IO BlobSpan #-}
-- | Append a blob.
--
-- If no exception is returned, then the file pointer will be set to exactly the
-- file size.
--
-- If an exception is returned, the file pointer points to a file
-- offset at or beyond the file size. The bytes between the old and new offset
-- might be garbage or missing.
addBlob :: (PrimMonad m, MonadThrow m)
        => HasFS m h
        -> Ref (WriteBufferBlobs m h)
        -> SerialisedBlob
        -> m BlobSpan
addBlob :: forall (m :: * -> *) h.
(PrimMonad m, MonadThrow m) =>
HasFS m h
-> Ref (WriteBufferBlobs m h) -> SerialisedBlob -> m BlobSpan
addBlob HasFS m h
fs (DeRef WriteBufferBlobs {Ref (BlobFile m h)
blobFile :: forall (m :: * -> *) h. WriteBufferBlobs m h -> Ref (BlobFile m h)
blobFile :: Ref (BlobFile m h)
blobFile, FilePointer m
blobFilePointer :: forall (m :: * -> *) h. WriteBufferBlobs m h -> FilePointer m
blobFilePointer :: FilePointer m
blobFilePointer}) SerialisedBlob
blob = do
    let blobsize :: Int
blobsize = SerialisedBlob -> Int
sizeofBlob SerialisedBlob
blob
    -- If an exception happens after updating the file pointer, then no write
    -- takes place. The next 'addBlob' will start writing at the new file
    -- offset, so there are going to be some uninitialised bytes in the file.
    Word64
bloboffset <- FilePointer m -> Int -> m Word64
forall (m :: * -> *).
PrimMonad m =>
FilePointer m -> Int -> m Word64
updateFilePointer FilePointer m
blobFilePointer Int
blobsize
    -- If an exception happens while writing the blob, the bytes in the file
    -- might be corrupted.
    HasFS m h -> Ref (BlobFile m h) -> SerialisedBlob -> Word64 -> m ()
forall (m :: * -> *) h.
(MonadThrow m, PrimMonad m) =>
HasFS m h -> Ref (BlobFile m h) -> SerialisedBlob -> Word64 -> m ()
BlobFile.writeBlob HasFS m h
fs Ref (BlobFile m h)
blobFile SerialisedBlob
blob Word64
bloboffset
    BlobSpan -> m BlobSpan
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return BlobSpan {
      blobSpanOffset :: Word64
blobSpanOffset = Word64
bloboffset,
      blobSpanSize :: Word32
blobSpanSize   = Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
blobsize
    }

-- | Helper function to make a 'RawBlobRef' that points into a
-- 'WriteBufferBlobs'.
--
-- This function should only be used on the result of 'addBlob' on the same
-- 'WriteBufferBlobs'. For example:
--
-- @
--  'addBlob' hfs wbb blob >>= \\span -> pure ('mkRawBlobRef' wbb span)
-- @
mkRawBlobRef :: Ref (WriteBufferBlobs m h)
             -> BlobSpan
             -> RawBlobRef m h
mkRawBlobRef :: forall (m :: * -> *) h.
Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
mkRawBlobRef (DeRef WriteBufferBlobs {blobFile :: forall (m :: * -> *) h. WriteBufferBlobs m h -> Ref (BlobFile m h)
blobFile = DeRef BlobFile m h
blobfile}) BlobSpan
blobspan =
    RawBlobRef {
      rawBlobRefFile :: BlobFile m h
rawBlobRefFile = BlobFile m h
blobfile,
      rawBlobRefSpan :: BlobSpan
rawBlobRefSpan = BlobSpan
blobspan
    }

-- | Helper function to make a 'WeakBlobRef' that points into a
-- 'WriteBufferBlobs'.
--
-- This function should only be used on the result of 'addBlob' on the same
-- 'WriteBufferBlobs'. For example:
--
-- @
--  'addBlob' hfs wbb blob >>= \\span -> pure ('mkWeakBlobRef' wbb span)
-- @
mkWeakBlobRef :: Ref (WriteBufferBlobs m h)
              -> BlobSpan
              -> WeakBlobRef m h
mkWeakBlobRef :: forall (m :: * -> *) h.
Ref (WriteBufferBlobs m h) -> BlobSpan -> WeakBlobRef m h
mkWeakBlobRef (DeRef WriteBufferBlobs {Ref (BlobFile m h)
blobFile :: forall (m :: * -> *) h. WriteBufferBlobs m h -> Ref (BlobFile m h)
blobFile :: Ref (BlobFile m h)
blobFile}) BlobSpan
blobspan =
    WeakBlobRef {
      weakBlobRefFile :: WeakRef (BlobFile m h)
weakBlobRefFile = Ref (BlobFile m h) -> WeakRef (BlobFile m h)
forall obj. Ref obj -> WeakRef obj
mkWeakRef Ref (BlobFile m h)
blobFile,
      weakBlobRefSpan :: BlobSpan
weakBlobRefSpan = BlobSpan
blobspan
    }


-- | A mutable file offset, suitable to share between threads.
--
-- This pointer is limited to 31-bit file offsets on 32-bit systems. This should
-- be a sufficiently large limit that we never reach it in practice.
newtype FilePointer m = FilePointer (PrimVar (PrimState m) Int)

instance NFData (FilePointer m) where
  rnf :: FilePointer m -> ()
rnf (FilePointer PrimVar (PrimState m) Int
var) = PrimVar (PrimState m) Int
var PrimVar (PrimState m) Int -> () -> ()
forall a b. a -> b -> b
`seq` ()

{-# SPECIALISE newFilePointer :: IO (FilePointer IO) #-}
newFilePointer :: PrimMonad m => m (FilePointer m)
newFilePointer :: forall (m :: * -> *). PrimMonad m => m (FilePointer m)
newFilePointer = PrimVar (PrimState m) Int -> FilePointer m
forall (m :: * -> *). PrimVar (PrimState m) Int -> FilePointer m
FilePointer (PrimVar (PrimState m) Int -> FilePointer m)
-> m (PrimVar (PrimState m) Int) -> m (FilePointer m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> m (PrimVar (PrimState m) Int)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
a -> m (PrimVar (PrimState m) a)
P.newPrimVar Int
0

{-# SPECIALISE updateFilePointer :: FilePointer IO -> Int -> IO Word64 #-}
-- | Update the file offset by a given amount and return the new offset. This
-- is safe to use concurrently.
--
updateFilePointer :: PrimMonad m => FilePointer m -> Int -> m Word64
updateFilePointer :: forall (m :: * -> *).
PrimMonad m =>
FilePointer m -> Int -> m Word64
updateFilePointer (FilePointer PrimVar (PrimState m) Int
var) Int
n = Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word64) -> m Int -> m Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> PrimVar (PrimState m) Int -> Int -> m Int
forall (m :: * -> *).
PrimMonad m =>
PrimVar (PrimState m) Int -> Int -> m Int
P.fetchAddInt PrimVar (PrimState m) Int
var Int
n