{-# LANGUAGE DataKinds          #-}
{-# LANGUAGE DeriveAnyClass     #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DerivingVia        #-}
{-# LANGUAGE RecordWildCards    #-}
{-# OPTIONS_HADDOCK not-home #-}

-- | Runs of sorted key\/value data.
module Database.LSMTree.Internal.Run (
    -- * Run
    Run (Run, runIndex, runHasFS, runHasBlockIO, runRunDataCaching,
         runBlobFile, runFilter, runKOpsFile)
  , RunFsPaths
  , size
  , sizeInPages
  , runFsPaths
  , runFsPathsNumber
  , runDataCaching
  , runIndexType
  , mkRawBlobRef
  , mkWeakBlobRef
    -- ** Run creation
  , newEmpty
  , fromBuilder
  , fromWriteBuffer
  , RunParams (..)
    -- * Snapshot
  , openFromDisk
  , RunDataCaching (..)
  , IndexType (..)
  ) where

import           Control.DeepSeq (NFData (..), rwhnf)
import           Control.Monad.Class.MonadST (MonadST)
import           Control.Monad.Class.MonadSTM (MonadSTM (..))
import           Control.Monad.Class.MonadThrow
import           Control.Monad.Primitive
import           Control.RefCount
import           Data.BloomFilter (Bloom)
import qualified Data.ByteString.Short as SBS
import           Data.Foldable (for_)
import           Database.LSMTree.Internal.BlobFile
import           Database.LSMTree.Internal.BlobRef hiding (mkRawBlobRef,
                     mkWeakBlobRef)
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
import           Database.LSMTree.Internal.BloomFilter (bloomFilterFromSBS)
import qualified Database.LSMTree.Internal.CRC32C as CRC
import           Database.LSMTree.Internal.Entry (NumEntries (..))
import           Database.LSMTree.Internal.Index (Index, IndexType (..))
import qualified Database.LSMTree.Internal.Index as Index
import           Database.LSMTree.Internal.Page (NumPages)
import           Database.LSMTree.Internal.Paths as Paths
import           Database.LSMTree.Internal.RunBuilder (RunBuilder,
                     RunDataCaching (..), RunParams (..))
import qualified Database.LSMTree.Internal.RunBuilder as Builder
import           Database.LSMTree.Internal.RunNumber
import           Database.LSMTree.Internal.Serialise
import           Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import           Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs)
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified System.FS.API as FS
import           System.FS.API (HasFS)
import qualified System.FS.BlockIO.API as FS
import           System.FS.BlockIO.API (HasBlockIO)

-- | The in-memory representation of a completed LSM run.
--
data Run m h = Run {
      forall (m :: * -> *) h. Run m h -> NumEntries
runNumEntries     :: !NumEntries
      -- | The reference count for the LSM run. This counts the
      -- number of references from LSM handles to this run. When
      -- this drops to zero the open files will be closed.
    , forall (m :: * -> *) h. Run m h -> RefCounter m
runRefCounter     :: !(RefCounter m)
      -- | The file system paths for all the files used by the run.
    , forall (m :: * -> *) h. Run m h -> RunFsPaths
runRunFsPaths     :: !RunFsPaths
      -- | The bloom filter for the set of keys in this run.
    , forall (m :: * -> *) h. Run m h -> Bloom SerialisedKey
runFilter         :: !(Bloom SerialisedKey)
      -- | The in-memory index mapping keys to page numbers in the
      -- Key\/Ops file. In future we may support alternative index
      -- representations.
    , forall (m :: * -> *) h. Run m h -> Index
runIndex          :: !Index
      -- | The file handle for the Key\/Ops file. This file is opened
      -- read-only and is accessed in a page-oriented way, i.e. only
      -- reading whole pages, at page offsets. It will be opened with
      -- 'O_DIRECT' on supported platforms.
    , forall (m :: * -> *) h. Run m h -> Handle h
runKOpsFile       :: !(FS.Handle h)
      -- | The file handle for the BLOBs file. This file is opened
      -- read-only and is accessed in a normal style using buffered
      -- I\/O, reading arbitrary file offset and length spans.
    , forall (m :: * -> *) h. Run m h -> Ref (BlobFile m h)
runBlobFile       :: !(Ref (BlobFile m h))
    , forall (m :: * -> *) h. Run m h -> RunDataCaching
runRunDataCaching :: !RunDataCaching
    , forall (m :: * -> *) h. Run m h -> HasFS m h
runHasFS          :: !(HasFS m h)
    , forall (m :: * -> *) h. Run m h -> HasBlockIO m h
runHasBlockIO     :: !(HasBlockIO m h)
    }

-- | Shows only the 'runRunFsPaths' field.
instance Show (Run m h) where
  showsPrec :: Int -> Run m h -> ShowS
showsPrec Int
_ Run m h
run = String -> ShowS
showString String
"Run { runRunFsPaths = " ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> RunFsPaths -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
0 (Run m h -> RunFsPaths
forall (m :: * -> *) h. Run m h -> RunFsPaths
runRunFsPaths Run m h
run) ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
.  String -> ShowS
showString String
" }"

instance NFData h => NFData (Run m h) where
  rnf :: Run m h -> ()
rnf (Run NumEntries
a RefCounter m
b RunFsPaths
c Bloom SerialisedKey
d Index
e Handle h
f Ref (BlobFile m h)
g RunDataCaching
h HasFS m h
i HasBlockIO m h
j) =
      NumEntries -> ()
forall a. NFData a => a -> ()
rnf NumEntries
a () -> () -> ()
forall a b. a -> b -> b
`seq` RefCounter m -> ()
forall a. a -> ()
rwhnf RefCounter m
b () -> () -> ()
forall a b. a -> b -> b
`seq` RunFsPaths -> ()
forall a. NFData a => a -> ()
rnf RunFsPaths
c () -> () -> ()
forall a b. a -> b -> b
`seq` Bloom SerialisedKey -> ()
forall a. NFData a => a -> ()
rnf Bloom SerialisedKey
d () -> () -> ()
forall a b. a -> b -> b
`seq` Index -> ()
forall a. NFData a => a -> ()
rnf Index
e () -> () -> ()
forall a b. a -> b -> b
`seq`
      Handle h -> ()
forall a. NFData a => a -> ()
rnf Handle h
f () -> () -> ()
forall a b. a -> b -> b
`seq` Ref (BlobFile m h) -> ()
forall a. NFData a => a -> ()
rnf Ref (BlobFile m h)
g () -> () -> ()
forall a b. a -> b -> b
`seq` RunDataCaching -> ()
forall a. NFData a => a -> ()
rnf RunDataCaching
h () -> () -> ()
forall a b. a -> b -> b
`seq` HasFS m h -> ()
forall a. a -> ()
rwhnf HasFS m h
i () -> () -> ()
forall a b. a -> b -> b
`seq` HasBlockIO m h -> ()
forall a. a -> ()
rwhnf HasBlockIO m h
j

instance RefCounted m (Run m h) where
    getRefCounter :: Run m h -> RefCounter m
getRefCounter = Run m h -> RefCounter m
forall (m :: * -> *) h. Run m h -> RefCounter m
runRefCounter

size :: Ref (Run m h) -> NumEntries
size :: forall (m :: * -> *) h. Ref (Run m h) -> NumEntries
size (DeRef Run m h
run) = Run m h -> NumEntries
forall (m :: * -> *) h. Run m h -> NumEntries
runNumEntries Run m h
run

sizeInPages :: Ref (Run m h) -> NumPages
sizeInPages :: forall (m :: * -> *) h. Ref (Run m h) -> NumPages
sizeInPages (DeRef Run m h
run) = Index -> NumPages
Index.sizeInPages (Run m h -> Index
forall (m :: * -> *) h. Run m h -> Index
runIndex Run m h
run)

runFsPaths :: Ref (Run m h) -> RunFsPaths
runFsPaths :: forall (m :: * -> *) h. Ref (Run m h) -> RunFsPaths
runFsPaths (DeRef Run m h
r) = Run m h -> RunFsPaths
forall (m :: * -> *) h. Run m h -> RunFsPaths
runRunFsPaths Run m h
r

runFsPathsNumber :: Ref (Run m h) -> RunNumber
runFsPathsNumber :: forall (m :: * -> *) h. Ref (Run m h) -> RunNumber
runFsPathsNumber = RunFsPaths -> RunNumber
Paths.runNumber (RunFsPaths -> RunNumber)
-> (Ref (Run m h) -> RunFsPaths) -> Ref (Run m h) -> RunNumber
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> RunFsPaths
forall (m :: * -> *) h. Ref (Run m h) -> RunFsPaths
runFsPaths

-- | See 'openFromDisk'
runIndexType :: Ref (Run m h) -> IndexType
runIndexType :: forall (m :: * -> *) h. Ref (Run m h) -> IndexType
runIndexType (DeRef Run m h
r) = Index -> IndexType
Index.indexToIndexType (Run m h -> Index
forall (m :: * -> *) h. Run m h -> Index
runIndex Run m h
r)

-- | See 'openFromDisk'
runDataCaching :: Ref (Run m h) -> RunDataCaching
runDataCaching :: forall (m :: * -> *) h. Ref (Run m h) -> RunDataCaching
runDataCaching (DeRef Run m h
r) = Run m h -> RunDataCaching
forall (m :: * -> *) h. Run m h -> RunDataCaching
runRunDataCaching Run m h
r


-- | Helper function to make a 'WeakBlobRef' that points into a 'Run'.
mkRawBlobRef :: Run m h -> BlobSpan -> RawBlobRef m h
mkRawBlobRef :: forall (m :: * -> *) h. Run m h -> BlobSpan -> RawBlobRef m h
mkRawBlobRef Run{Ref (BlobFile m h)
runBlobFile :: forall (m :: * -> *) h. Run m h -> Ref (BlobFile m h)
runBlobFile :: Ref (BlobFile m h)
runBlobFile} BlobSpan
blobspan =
    Ref (BlobFile m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (BlobFile m h) -> BlobSpan -> RawBlobRef m h
BlobRef.mkRawBlobRef Ref (BlobFile m h)
runBlobFile BlobSpan
blobspan

-- | Helper function to make a 'WeakBlobRef' that points into a 'Run'.
mkWeakBlobRef :: Ref (Run m h) -> BlobSpan -> WeakBlobRef m h
mkWeakBlobRef :: forall (m :: * -> *) h.
Ref (Run m h) -> BlobSpan -> WeakBlobRef m h
mkWeakBlobRef (DeRef Run{Ref (BlobFile m h)
runBlobFile :: forall (m :: * -> *) h. Run m h -> Ref (BlobFile m h)
runBlobFile :: Ref (BlobFile m h)
runBlobFile}) BlobSpan
blobspan =
    Ref (BlobFile m h) -> BlobSpan -> WeakBlobRef m h
forall (m :: * -> *) h.
Ref (BlobFile m h) -> BlobSpan -> WeakBlobRef m h
BlobRef.mkWeakBlobRef Ref (BlobFile m h)
runBlobFile BlobSpan
blobspan

{-# SPECIALISE finaliser ::
     HasFS IO h
  -> FS.Handle h
  -> Ref (BlobFile IO h)
  -> RunFsPaths
  -> IO () #-}
-- | Close the files used in the run and remove them from disk. After calling
-- this operation, the run must not be used anymore.
--
-- TODO: exception safety
finaliser ::
     (MonadSTM m, MonadMask m, PrimMonad m)
  => HasFS m h
  -> FS.Handle h
  -> Ref (BlobFile m h)
  -> RunFsPaths
  -> m ()
finaliser :: forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
HasFS m h -> Handle h -> Ref (BlobFile m h) -> RunFsPaths -> m ()
finaliser HasFS m h
hfs Handle h
kopsFile Ref (BlobFile m h)
blobFile RunFsPaths
fsPaths = do
    HasFS m h -> HasCallStack => Handle h -> m ()
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Handle h -> m ()
FS.hClose HasFS m h
hfs Handle h
kopsFile
    Ref (BlobFile m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (BlobFile m h)
blobFile
    HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeFile HasFS m h
hfs (RunFsPaths -> FsPath
runKOpsPath RunFsPaths
fsPaths)
    HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeFile HasFS m h
hfs (RunFsPaths -> FsPath
runFilterPath RunFsPaths
fsPaths)
    HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeFile HasFS m h
hfs (RunFsPaths -> FsPath
runIndexPath RunFsPaths
fsPaths)
    HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeFile HasFS m h
hfs (RunFsPaths -> FsPath
runChecksumsPath RunFsPaths
fsPaths)

{-# SPECIALISE setRunDataCaching ::
     HasBlockIO IO h
  -> FS.Handle h
  -> RunDataCaching
  -> IO () #-}
setRunDataCaching ::
     MonadSTM m
  => HasBlockIO m h
  -> FS.Handle h
  -> RunDataCaching
  -> m ()
setRunDataCaching :: forall (m :: * -> *) h.
MonadSTM m =>
HasBlockIO m h -> Handle h -> RunDataCaching -> m ()
setRunDataCaching HasBlockIO m h
hbio Handle h
runKOpsFile RunDataCaching
CacheRunData = do
    -- disable file readahead (only applies to this file descriptor)
    HasBlockIO m h -> Handle h -> Advice -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> Handle h -> Advice -> m ()
FS.hAdviseAll HasBlockIO m h
hbio Handle h
runKOpsFile Advice
FS.AdviceRandom
    -- use the page cache for disk I/O reads
    HasBlockIO m h -> Handle h -> Bool -> m ()
forall (m :: * -> *) h. HasBlockIO m h -> Handle h -> Bool -> m ()
FS.hSetNoCache HasBlockIO m h
hbio Handle h
runKOpsFile Bool
False
setRunDataCaching HasBlockIO m h
hbio Handle h
runKOpsFile RunDataCaching
NoCacheRunData = do
    -- do not use the page cache for disk I/O reads
    HasBlockIO m h -> Handle h -> Bool -> m ()
forall (m :: * -> *) h. HasBlockIO m h -> Handle h -> Bool -> m ()
FS.hSetNoCache HasBlockIO m h
hbio Handle h
runKOpsFile Bool
True

{-# SPECIALISE newEmpty ::
     HasFS IO h
  -> HasBlockIO IO h
  -> RunParams
  -> RunFsPaths
  -> IO (Ref (Run IO h)) #-}
-- | This function should be run with asynchronous exceptions masked to prevent
-- failing after internal resources have already been created.
newEmpty ::
     (MonadST m, MonadSTM m, MonadMask m)
  => HasFS m h
  -> HasBlockIO m h
  -> RunParams
  -> RunFsPaths
  -> m (Ref (Run m h))
newEmpty :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h -> RunParams -> RunFsPaths -> m (Ref (Run m h))
newEmpty HasFS m h
hfs HasBlockIO m h
hbio RunParams
runParams RunFsPaths
runPaths = do
    RunBuilder m h
builder <- HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> NumEntries
-> m (RunBuilder m h)
forall (m :: * -> *) h.
(MonadST m, MonadSTM m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> NumEntries
-> m (RunBuilder m h)
Builder.new HasFS m h
hfs HasBlockIO m h
hbio RunParams
runParams RunFsPaths
runPaths (Int -> NumEntries
NumEntries Int
0)
    RunBuilder m h -> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
RunBuilder m h -> m (Ref (Run m h))
fromBuilder RunBuilder m h
builder

{-# SPECIALISE fromBuilder ::
     RunBuilder IO h
  -> IO (Ref (Run IO h)) #-}
-- TODO: make exception safe
fromBuilder ::
     (MonadST m, MonadSTM m, MonadMask m)
  => RunBuilder m h
  -> m (Ref (Run m h))
fromBuilder :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
RunBuilder m h -> m (Ref (Run m h))
fromBuilder RunBuilder m h
builder = do
    (HasFS m h
runHasFS, HasBlockIO m h
runHasBlockIO,
     RunFsPaths
runRunFsPaths, Bloom SerialisedKey
runFilter, Index
runIndex,
     RunParams {runParamCaching :: RunParams -> RunDataCaching
runParamCaching = RunDataCaching
runRunDataCaching}, NumEntries
runNumEntries) <-
      RunBuilder m h
-> m (HasFS m h, HasBlockIO m h, RunFsPaths, Bloom SerialisedKey,
      Index, RunParams, NumEntries)
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadThrow m) =>
RunBuilder m h
-> m (HasFS m h, HasBlockIO m h, RunFsPaths, Bloom SerialisedKey,
      Index, RunParams, NumEntries)
Builder.unsafeFinalise RunBuilder m h
builder
    Handle h
runKOpsFile <- HasFS m h -> HasCallStack => FsPath -> OpenMode -> m (Handle h)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> OpenMode -> m (Handle h)
FS.hOpen HasFS m h
runHasFS (RunFsPaths -> FsPath
runKOpsPath RunFsPaths
runRunFsPaths) OpenMode
FS.ReadMode
    -- TODO: openBlobFile should be called with exceptions masked
    Ref (BlobFile m h)
runBlobFile <- HasFS m h -> FsPath -> OpenMode -> m (Ref (BlobFile m h))
forall (m :: * -> *) h.
(PrimMonad m, MonadCatch m, HasCallStack) =>
HasFS m h -> FsPath -> OpenMode -> m (Ref (BlobFile m h))
openBlobFile HasFS m h
runHasFS (RunFsPaths -> FsPath
runBlobPath RunFsPaths
runRunFsPaths) OpenMode
FS.ReadMode
    HasBlockIO m h -> Handle h -> RunDataCaching -> m ()
forall (m :: * -> *) h.
MonadSTM m =>
HasBlockIO m h -> Handle h -> RunDataCaching -> m ()
setRunDataCaching HasBlockIO m h
runHasBlockIO Handle h
runKOpsFile RunDataCaching
runRunDataCaching
    m () -> (RefCounter m -> Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, HasCallStack) =>
m () -> (RefCounter m -> obj) -> m (Ref obj)
newRef (HasFS m h -> Handle h -> Ref (BlobFile m h) -> RunFsPaths -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
HasFS m h -> Handle h -> Ref (BlobFile m h) -> RunFsPaths -> m ()
finaliser HasFS m h
runHasFS Handle h
runKOpsFile Ref (BlobFile m h)
runBlobFile RunFsPaths
runRunFsPaths)
           (\RefCounter m
runRefCounter -> Run { Handle h
HasFS m h
HasBlockIO m h
Bloom SerialisedKey
Ref (BlobFile m h)
RefCounter m
NumEntries
RunFsPaths
Index
RunDataCaching
runIndex :: Index
runHasFS :: HasFS m h
runHasBlockIO :: HasBlockIO m h
runRunDataCaching :: RunDataCaching
runBlobFile :: Ref (BlobFile m h)
runFilter :: Bloom SerialisedKey
runKOpsFile :: Handle h
runNumEntries :: NumEntries
runRefCounter :: RefCounter m
runRunFsPaths :: RunFsPaths
runHasFS :: HasFS m h
runHasBlockIO :: HasBlockIO m h
runRunFsPaths :: RunFsPaths
runFilter :: Bloom SerialisedKey
runIndex :: Index
runRunDataCaching :: RunDataCaching
runNumEntries :: NumEntries
runKOpsFile :: Handle h
runBlobFile :: Ref (BlobFile m h)
runRefCounter :: RefCounter m
.. })

{-# SPECIALISE fromWriteBuffer ::
     HasFS IO h
  -> HasBlockIO IO h
  -> RunParams
  -> RunFsPaths
  -> WriteBuffer
  -> Ref (WriteBufferBlobs IO h)
  -> IO (Ref (Run IO h)) #-}
-- | Write a write buffer to disk, including the blobs it contains.
--
-- This creates a new 'Run' which must eventually be released with 'releaseRef'.
--
-- TODO: As a possible optimisation, blobs could be written to a blob file
-- immediately when they are added to the write buffer, avoiding the need to do
-- it here.
--
-- This function should be run with asynchronous exceptions masked to prevent
-- failing after internal resources have already been created.
fromWriteBuffer ::
     (MonadST m, MonadSTM m, MonadMask m)
  => HasFS m h
  -> HasBlockIO m h
  -> RunParams
  -> RunFsPaths
  -> WriteBuffer
  -> Ref (WriteBufferBlobs m h)
  -> m (Ref (Run m h))
fromWriteBuffer :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
fromWriteBuffer HasFS m h
fs HasBlockIO m h
hbio RunParams
params RunFsPaths
fsPaths WriteBuffer
buffer Ref (WriteBufferBlobs m h)
blobs = do
    RunBuilder m h
builder <- HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> NumEntries
-> m (RunBuilder m h)
forall (m :: * -> *) h.
(MonadST m, MonadSTM m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> NumEntries
-> m (RunBuilder m h)
Builder.new HasFS m h
fs HasBlockIO m h
hbio RunParams
params RunFsPaths
fsPaths (WriteBuffer -> NumEntries
WB.numEntries WriteBuffer
buffer)
    [(SerialisedKey, Entry SerialisedValue BlobSpan)]
-> ((SerialisedKey, Entry SerialisedValue BlobSpan) -> m ())
-> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (WriteBuffer -> [(SerialisedKey, Entry SerialisedValue BlobSpan)]
WB.toList WriteBuffer
buffer) (((SerialisedKey, Entry SerialisedValue BlobSpan) -> m ()) -> m ())
-> ((SerialisedKey, Entry SerialisedValue BlobSpan) -> m ())
-> m ()
forall a b. (a -> b) -> a -> b
$ \(SerialisedKey
k, Entry SerialisedValue BlobSpan
e) ->
      RunBuilder m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadThrow m) =>
RunBuilder m h
-> SerialisedKey -> Entry SerialisedValue (RawBlobRef m h) -> m ()
Builder.addKeyOp RunBuilder m h
builder SerialisedKey
k ((BlobSpan -> RawBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (RawBlobRef m h)
forall a b.
(a -> b) -> Entry SerialisedValue a -> Entry SerialisedValue b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (WriteBufferBlobs m h) -> BlobSpan -> RawBlobRef m h
WBB.mkRawBlobRef Ref (WriteBufferBlobs m h)
blobs) Entry SerialisedValue BlobSpan
e)
      --TODO: the fmap entry here reallocates even when there are no blobs
    RunBuilder m h -> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
RunBuilder m h -> m (Ref (Run m h))
fromBuilder RunBuilder m h
builder

{-------------------------------------------------------------------------------
  Snapshot
-------------------------------------------------------------------------------}

{-# SPECIALISE openFromDisk ::
     HasFS IO h
  -> HasBlockIO IO h
  -> RunDataCaching
  -> IndexType
  -> RunFsPaths
  -> IO (Ref (Run IO h)) #-}
-- | Load a previously written run from disk, checking each file's checksum
-- against the checksum file.
--
-- This creates a new 'Run' which must eventually be released with 'releaseRef'.
--
-- Exceptions will be raised when any of the file's contents don't match their
-- checksum ('ChecksumError') or can't be parsed ('FileFormatError').
--
-- The 'RunDataCaching' and 'IndexType' parameters need to be saved and
-- restored separately because these are not stored in the on-disk
-- representation. Use 'runDataCaching' and 'runIndexType' to obtain these
-- parameters from the open run before persisting to disk.
--
-- TODO: it may make more sense to persist these parameters with the run's
-- on-disk representation.
--
openFromDisk ::
     forall m h.
     (MonadSTM m, MonadMask m, PrimMonad m)
  => HasFS m h
  -> HasBlockIO m h
  -> RunDataCaching
  -> IndexType
  -> RunFsPaths
  -> m (Ref (Run m h))
-- TODO: make exception safe
openFromDisk :: forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
HasFS m h
-> HasBlockIO m h
-> RunDataCaching
-> IndexType
-> RunFsPaths
-> m (Ref (Run m h))
openFromDisk HasFS m h
fs HasBlockIO m h
hbio RunDataCaching
runRunDataCaching IndexType
indexType RunFsPaths
runRunFsPaths = do
    ForRunFiles CRC32C
expectedChecksums <-
       FsPath
-> FileFormat
-> Either String (ForRunFiles CRC32C)
-> m (ForRunFiles CRC32C)
forall (m :: * -> *) a.
MonadThrow m =>
FsPath -> FileFormat -> Either String a -> m a
CRC.expectValidFile (RunFsPaths -> FsPath
runChecksumsPath RunFsPaths
runRunFsPaths) FileFormat
CRC.FormatChecksumsFile (Either String (ForRunFiles CRC32C) -> m (ForRunFiles CRC32C))
-> (ChecksumsFile -> Either String (ForRunFiles CRC32C))
-> ChecksumsFile
-> m (ForRunFiles CRC32C)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChecksumsFile -> Either String (ForRunFiles CRC32C)
fromChecksumsFile
         (ChecksumsFile -> m (ForRunFiles CRC32C))
-> m ChecksumsFile -> m (ForRunFiles CRC32C)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< HasFS m h -> FsPath -> m ChecksumsFile
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> FsPath -> m ChecksumsFile
CRC.readChecksumsFile HasFS m h
fs (RunFsPaths -> FsPath
runChecksumsPath RunFsPaths
runRunFsPaths)

    -- verify checksums of files we don't read yet
    let paths :: ForRunFiles FsPath
paths = RunFsPaths -> ForRunFiles FsPath
pathsForRunFiles RunFsPaths
runRunFsPaths
    RunDataCaching -> CRC32C -> FsPath -> m ()
checkCRC RunDataCaching
runRunDataCaching (ForRunFiles CRC32C -> CRC32C
forall a. ForRunFiles a -> a
forRunKOpsRaw ForRunFiles CRC32C
expectedChecksums) (ForRunFiles FsPath -> FsPath
forall a. ForRunFiles a -> a
forRunKOpsRaw ForRunFiles FsPath
paths)
    RunDataCaching -> CRC32C -> FsPath -> m ()
checkCRC RunDataCaching
runRunDataCaching (ForRunFiles CRC32C -> CRC32C
forall a. ForRunFiles a -> a
forRunBlobRaw ForRunFiles CRC32C
expectedChecksums) (ForRunFiles FsPath -> FsPath
forall a. ForRunFiles a -> a
forRunBlobRaw ForRunFiles FsPath
paths)

    -- read and try parsing files
    Bloom SerialisedKey
runFilter <-
      FsPath
-> FileFormat
-> Either String (Bloom SerialisedKey)
-> m (Bloom SerialisedKey)
forall (m :: * -> *) a.
MonadThrow m =>
FsPath -> FileFormat -> Either String a -> m a
CRC.expectValidFile (ForRunFiles FsPath -> FsPath
forall a. ForRunFiles a -> a
forRunFilterRaw ForRunFiles FsPath
paths) FileFormat
CRC.FormatBloomFilterFile (Either String (Bloom SerialisedKey) -> m (Bloom SerialisedKey))
-> (ShortByteString -> Either String (Bloom SerialisedKey))
-> ShortByteString
-> m (Bloom SerialisedKey)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ShortByteString -> Either String (Bloom SerialisedKey)
forall a. ShortByteString -> Either String (Bloom a)
bloomFilterFromSBS
        (ShortByteString -> m (Bloom SerialisedKey))
-> m ShortByteString -> m (Bloom SerialisedKey)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< CRC32C -> FsPath -> m ShortByteString
readCRC (ForRunFiles CRC32C -> CRC32C
forall a. ForRunFiles a -> a
forRunFilterRaw ForRunFiles CRC32C
expectedChecksums) (ForRunFiles FsPath -> FsPath
forall a. ForRunFiles a -> a
forRunFilterRaw ForRunFiles FsPath
paths)
    (NumEntries
runNumEntries, Index
runIndex) <-
      FsPath
-> FileFormat
-> Either String (NumEntries, Index)
-> m (NumEntries, Index)
forall (m :: * -> *) a.
MonadThrow m =>
FsPath -> FileFormat -> Either String a -> m a
CRC.expectValidFile (ForRunFiles FsPath -> FsPath
forall a. ForRunFiles a -> a
forRunIndexRaw ForRunFiles FsPath
paths) FileFormat
CRC.FormatIndexFile (Either String (NumEntries, Index) -> m (NumEntries, Index))
-> (ShortByteString -> Either String (NumEntries, Index))
-> ShortByteString
-> m (NumEntries, Index)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IndexType -> ShortByteString -> Either String (NumEntries, Index)
Index.fromSBS IndexType
indexType
        (ShortByteString -> m (NumEntries, Index))
-> m ShortByteString -> m (NumEntries, Index)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< CRC32C -> FsPath -> m ShortByteString
readCRC (ForRunFiles CRC32C -> CRC32C
forall a. ForRunFiles a -> a
forRunIndexRaw ForRunFiles CRC32C
expectedChecksums) (ForRunFiles FsPath -> FsPath
forall a. ForRunFiles a -> a
forRunIndexRaw ForRunFiles FsPath
paths)

    Handle h
runKOpsFile <- HasFS m h -> HasCallStack => FsPath -> OpenMode -> m (Handle h)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> OpenMode -> m (Handle h)
FS.hOpen HasFS m h
fs (RunFsPaths -> FsPath
runKOpsPath RunFsPaths
runRunFsPaths) OpenMode
FS.ReadMode
    -- TODO: openBlobFile should be called with exceptions masked
    Ref (BlobFile m h)
runBlobFile <- HasFS m h -> FsPath -> OpenMode -> m (Ref (BlobFile m h))
forall (m :: * -> *) h.
(PrimMonad m, MonadCatch m, HasCallStack) =>
HasFS m h -> FsPath -> OpenMode -> m (Ref (BlobFile m h))
openBlobFile HasFS m h
fs (RunFsPaths -> FsPath
runBlobPath RunFsPaths
runRunFsPaths) OpenMode
FS.ReadMode
    HasBlockIO m h -> Handle h -> RunDataCaching -> m ()
forall (m :: * -> *) h.
MonadSTM m =>
HasBlockIO m h -> Handle h -> RunDataCaching -> m ()
setRunDataCaching HasBlockIO m h
hbio Handle h
runKOpsFile RunDataCaching
runRunDataCaching
    m () -> (RefCounter m -> Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, HasCallStack) =>
m () -> (RefCounter m -> obj) -> m (Ref obj)
newRef (HasFS m h -> Handle h -> Ref (BlobFile m h) -> RunFsPaths -> m ()
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, PrimMonad m) =>
HasFS m h -> Handle h -> Ref (BlobFile m h) -> RunFsPaths -> m ()
finaliser HasFS m h
fs Handle h
runKOpsFile Ref (BlobFile m h)
runBlobFile RunFsPaths
runRunFsPaths) ((RefCounter m -> Run m h) -> m (Ref (Run m h)))
-> (RefCounter m -> Run m h) -> m (Ref (Run m h))
forall a b. (a -> b) -> a -> b
$ \RefCounter m
runRefCounter ->
      Run {
        runHasFS :: HasFS m h
runHasFS = HasFS m h
fs
      , runHasBlockIO :: HasBlockIO m h
runHasBlockIO = HasBlockIO m h
hbio
      , Handle h
Bloom SerialisedKey
Ref (BlobFile m h)
RefCounter m
NumEntries
RunFsPaths
Index
RunDataCaching
runIndex :: Index
runRunDataCaching :: RunDataCaching
runBlobFile :: Ref (BlobFile m h)
runFilter :: Bloom SerialisedKey
runKOpsFile :: Handle h
runNumEntries :: NumEntries
runRefCounter :: RefCounter m
runRunFsPaths :: RunFsPaths
runRunDataCaching :: RunDataCaching
runRunFsPaths :: RunFsPaths
runFilter :: Bloom SerialisedKey
runNumEntries :: NumEntries
runIndex :: Index
runKOpsFile :: Handle h
runBlobFile :: Ref (BlobFile m h)
runRefCounter :: RefCounter m
..
      }
  where
    -- Note: all file data for this path is evicted from the page cache /if/ the
    -- caching argument is 'NoCacheRunData'.
    checkCRC :: RunDataCaching -> CRC.CRC32C -> FS.FsPath -> m ()
    checkCRC :: RunDataCaching -> CRC32C -> FsPath -> m ()
checkCRC RunDataCaching
cache CRC32C
expected FsPath
fp =
      HasFS m h -> HasBlockIO m h -> Bool -> CRC32C -> FsPath -> m ()
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
HasFS m h -> HasBlockIO m h -> Bool -> CRC32C -> FsPath -> m ()
CRC.checkCRC HasFS m h
fs HasBlockIO m h
hbio (RunDataCaching
cache RunDataCaching -> RunDataCaching -> Bool
forall a. Eq a => a -> a -> Bool
== RunDataCaching
NoCacheRunData) CRC32C
expected FsPath
fp

    -- Note: all file data for this path is evicted from the page cache
    readCRC :: CRC.CRC32C -> FS.FsPath -> m SBS.ShortByteString
    readCRC :: CRC32C -> FsPath -> m ShortByteString
readCRC CRC32C
expected FsPath
fp = HasFS m h
-> FsPath
-> OpenMode
-> (Handle h -> m ShortByteString)
-> m ShortByteString
forall (m :: * -> *) h a.
(HasCallStack, MonadThrow m) =>
HasFS m h -> FsPath -> OpenMode -> (Handle h -> m a) -> m a
FS.withFile HasFS m h
fs FsPath
fp OpenMode
FS.ReadMode ((Handle h -> m ShortByteString) -> m ShortByteString)
-> (Handle h -> m ShortByteString) -> m ShortByteString
forall a b. (a -> b) -> a -> b
$ \Handle h
h -> do
        Word64
n <- HasFS m h -> HasCallStack => Handle h -> m Word64
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Handle h -> m Word64
FS.hGetSize HasFS m h
fs Handle h
h
        -- double the file readahead window (only applies to this file descriptor)
        HasBlockIO m h -> Handle h -> Advice -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> Handle h -> Advice -> m ()
FS.hAdviseAll HasBlockIO m h
hbio Handle h
h Advice
FS.AdviceSequential
        (ShortByteString
sbs, !CRC32C
checksum) <- HasFS m h
-> Handle h -> ByteCount -> CRC32C -> m (ShortByteString, CRC32C)
forall (m :: * -> *) h.
(MonadThrow m, PrimMonad m) =>
HasFS m h
-> Handle h -> ByteCount -> CRC32C -> m (ShortByteString, CRC32C)
CRC.hGetExactlyCRC32C_SBS HasFS m h
fs Handle h
h (Word64 -> ByteCount
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
n) CRC32C
CRC.initialCRC32C
        -- drop the file from the OS page cache
        HasBlockIO m h -> Handle h -> Advice -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> Handle h -> Advice -> m ()
FS.hAdviseAll HasBlockIO m h
hbio Handle h
h Advice
FS.AdviceDontNeed
        FsPath -> CRC32C -> CRC32C -> m ()
forall (m :: * -> *).
MonadThrow m =>
FsPath -> CRC32C -> CRC32C -> m ()
CRC.expectChecksum FsPath
fp CRC32C
expected CRC32C
checksum
        ShortByteString -> m ShortByteString
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ShortByteString
sbs