{-# OPTIONS_HADDOCK not-home #-}
module Database.LSMTree.Internal.WriteBufferReader (
readWriteBuffer
) where
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Monad.Class.MonadST (MonadST (..))
import Control.Monad.Class.MonadSTM (MonadSTM (..))
import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..),
bracketOnError)
import Control.Monad.Primitive (PrimMonad (..))
import Control.RefCount (Ref, dupRef, releaseRef)
import Data.Primitive.MutVar (MutVar, newMutVar, readMutVar,
writeMutVar)
import Data.Primitive.PrimVar
import Data.Word (Word16)
import Database.LSMTree.Internal.BlobFile (BlobFile)
import Database.LSMTree.Internal.BlobRef (RawBlobRef (..),
mkRawBlobRef)
import qualified Database.LSMTree.Internal.Entry as E
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import Database.LSMTree.Internal.Paths
import Database.LSMTree.Internal.RawPage
import Database.LSMTree.Internal.RunReader (Entry (..), Result (..),
mkEntryOverflow, readDiskPage, readOverflowPages,
toFullEntry)
import Database.LSMTree.Internal.Serialise (SerialisedValue)
import Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified System.FS.API as FS
import System.FS.API (HasFS)
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API (HasBlockIO)
{-# SPECIALISE
readWriteBuffer ::
ResolveSerialisedValue
-> HasFS IO h
-> HasBlockIO IO h
-> ForKOps FS.FsPath
-> Ref (BlobFile IO h)
-> IO WriteBuffer
#-}
readWriteBuffer ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> ForKOps FS.FsPath
-> Ref (BlobFile m h)
-> m WriteBuffer
readWriteBuffer :: forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> ForKOps FsPath
-> Ref (BlobFile m h)
-> m WriteBuffer
readWriteBuffer ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio ForKOps FsPath
kOpsPath Ref (BlobFile m h)
blobFile =
m (WriteBufferReader m h)
-> (WriteBufferReader m h -> m ())
-> (WriteBufferReader m h -> m WriteBuffer)
-> m WriteBuffer
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (HasFS m h
-> HasBlockIO m h
-> ForKOps FsPath
-> Ref (BlobFile m h)
-> m (WriteBufferReader m h)
forall (m :: * -> *) h.
(MonadMVar m, MonadST m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> ForKOps FsPath
-> Ref (BlobFile m h)
-> m (WriteBufferReader m h)
new HasFS m h
hfs HasBlockIO m h
hbio ForKOps FsPath
kOpsPath Ref (BlobFile m h)
blobFile) WriteBufferReader m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
WriteBufferReader m h -> m ()
close ((WriteBufferReader m h -> m WriteBuffer) -> m WriteBuffer)
-> (WriteBufferReader m h -> m WriteBuffer) -> m WriteBuffer
forall a b. (a -> b) -> a -> b
$ WriteBufferReader m h -> m WriteBuffer
readEntries
where
readEntries :: WriteBufferReader m h -> m WriteBuffer
readEntries WriteBufferReader m h
reader = WriteBuffer -> m WriteBuffer
readEntriesAcc WriteBuffer
WB.empty
where
readEntriesAcc :: WriteBuffer -> m WriteBuffer
readEntriesAcc WriteBuffer
acc = WriteBufferReader m h -> m (Result m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMask m) =>
WriteBufferReader m h -> m (Result m h)
next WriteBufferReader m h
reader m (Result m h) -> (Result m h -> m WriteBuffer) -> m WriteBuffer
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Result m h
Empty -> WriteBuffer -> m WriteBuffer
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure WriteBuffer
acc
ReadEntry SerialisedKey
key Entry m h
entry -> WriteBuffer -> m WriteBuffer
readEntriesAcc (WriteBuffer -> m WriteBuffer) -> WriteBuffer -> m WriteBuffer
forall a b. (a -> b) -> a -> b
$
ResolveSerialisedValue
-> SerialisedKey
-> Entry SerialisedValue BlobSpan
-> WriteBuffer
-> WriteBuffer
WB.addEntry ResolveSerialisedValue
resolve SerialisedKey
key (RawBlobRef m h -> BlobSpan
forall (m :: * -> *) h. RawBlobRef m h -> BlobSpan
rawBlobRefSpan (RawBlobRef m h -> BlobSpan)
-> Entry SerialisedValue (RawBlobRef m h)
-> Entry SerialisedValue BlobSpan
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Entry m h -> Entry SerialisedValue (RawBlobRef m h)
forall (m :: * -> *) h.
Entry m h -> Entry SerialisedValue (RawBlobRef m h)
toFullEntry Entry m h
entry) WriteBuffer
acc
data WriteBufferReader m h = WriteBufferReader {
forall (m :: * -> *) h.
WriteBufferReader m h -> MutVar (PrimState m) (Maybe RawPage)
readerCurrentPage :: !(MutVar (PrimState m) (Maybe RawPage))
, forall (m :: * -> *) h.
WriteBufferReader m h -> PrimVar (PrimState m) Word16
readerCurrentEntryNo :: !(PrimVar (PrimState m) Word16)
, forall (m :: * -> *) h. WriteBufferReader m h -> Handle h
readerKOpsHandle :: !(FS.Handle h)
, forall (m :: * -> *) h. WriteBufferReader m h -> Ref (BlobFile m h)
readerBlobFile :: !(Ref (BlobFile m h))
, forall (m :: * -> *) h. WriteBufferReader m h -> HasFS m h
readerHasFS :: !(HasFS m h)
, forall (m :: * -> *) h. WriteBufferReader m h -> HasBlockIO m h
readerHasBlockIO :: !(HasBlockIO m h)
}
{-# SPECIALISE
new ::
HasFS IO h
-> HasBlockIO IO h
-> ForKOps FS.FsPath
-> Ref (BlobFile IO h)
-> IO (WriteBufferReader IO h)
#-}
new :: forall m h.
(MonadMVar m, MonadST m, MonadMask m)
=> HasFS m h
-> HasBlockIO m h
-> ForKOps FS.FsPath
-> Ref (BlobFile m h)
-> m (WriteBufferReader m h)
new :: forall (m :: * -> *) h.
(MonadMVar m, MonadST m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> ForKOps FsPath
-> Ref (BlobFile m h)
-> m (WriteBufferReader m h)
new HasFS m h
readerHasFS HasBlockIO m h
readerHasBlockIO ForKOps FsPath
kOpsPath Ref (BlobFile m h)
blobFile =
m (Handle h)
-> (Handle h -> m ())
-> (Handle h -> m (WriteBufferReader m h))
-> m (WriteBufferReader m h)
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadCatch m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError m (Handle h)
openKOps (HasFS m h -> HasCallStack => Handle h -> m ()
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Handle h -> m ()
FS.hClose HasFS m h
readerHasFS) ((Handle h -> m (WriteBufferReader m h))
-> m (WriteBufferReader m h))
-> (Handle h -> m (WriteBufferReader m h))
-> m (WriteBufferReader m h)
forall a b. (a -> b) -> a -> b
$ \Handle h
readerKOpsHandle -> do
HasBlockIO m h -> Handle h -> Advice -> m ()
forall (m :: * -> *) h.
HasBlockIO m h -> Handle h -> Advice -> m ()
FS.hAdviseAll HasBlockIO m h
readerHasBlockIO Handle h
readerKOpsHandle Advice
FS.AdviceSequential
m (Ref (BlobFile m h))
-> (Ref (BlobFile m h) -> m ())
-> (Ref (BlobFile m h) -> m (WriteBufferReader m h))
-> m (WriteBufferReader m h)
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadCatch m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError (Ref (BlobFile m h) -> m (Ref (BlobFile m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (BlobFile m h)
blobFile) Ref (BlobFile m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef ((Ref (BlobFile m h) -> m (WriteBufferReader m h))
-> m (WriteBufferReader m h))
-> (Ref (BlobFile m h) -> m (WriteBufferReader m h))
-> m (WriteBufferReader m h)
forall a b. (a -> b) -> a -> b
$ \Ref (BlobFile m h)
readerBlobFile -> do
PrimVar (PrimState m) Word16
readerCurrentEntryNo <- Word16 -> m (PrimVar (PrimState m) Word16)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
a -> m (PrimVar (PrimState m) a)
newPrimVar (Word16
0 :: Word16)
Maybe RawPage
firstPage <- HasFS m h -> Handle h -> m (Maybe RawPage)
forall (m :: * -> *) h.
(MonadCatch m, PrimMonad m) =>
HasFS m h -> Handle h -> m (Maybe RawPage)
readDiskPage HasFS m h
readerHasFS Handle h
readerKOpsHandle
MutVar (PrimState m) (Maybe RawPage)
readerCurrentPage <- Maybe RawPage -> m (MutVar (PrimState m) (Maybe RawPage))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar Maybe RawPage
firstPage
WriteBufferReader m h -> m (WriteBufferReader m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WriteBufferReader m h -> m (WriteBufferReader m h))
-> WriteBufferReader m h -> m (WriteBufferReader m h)
forall a b. (a -> b) -> a -> b
$ WriteBufferReader{Handle h
HasFS m h
HasBlockIO m h
Ref (BlobFile m h)
MutVar (PrimState m) (Maybe RawPage)
PrimVar (PrimState m) Word16
readerCurrentPage :: MutVar (PrimState m) (Maybe RawPage)
readerCurrentEntryNo :: PrimVar (PrimState m) Word16
readerKOpsHandle :: Handle h
readerBlobFile :: Ref (BlobFile m h)
readerHasFS :: HasFS m h
readerHasBlockIO :: HasBlockIO m h
readerHasFS :: HasFS m h
readerHasBlockIO :: HasBlockIO m h
readerKOpsHandle :: Handle h
readerBlobFile :: Ref (BlobFile m h)
readerCurrentEntryNo :: PrimVar (PrimState m) Word16
readerCurrentPage :: MutVar (PrimState m) (Maybe RawPage)
..}
where
openKOps :: m (Handle h)
openKOps = HasFS m h -> HasCallStack => FsPath -> OpenMode -> m (Handle h)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> OpenMode -> m (Handle h)
FS.hOpen HasFS m h
readerHasFS (ForKOps FsPath -> FsPath
forall a. ForKOps a -> a
unForKOps ForKOps FsPath
kOpsPath) OpenMode
FS.ReadMode
{-# SPECIALISE
next ::
WriteBufferReader IO h
-> IO (Result IO h)
#-}
next :: forall m h.
(MonadSTM m, MonadST m, MonadMask m)
=> WriteBufferReader m h
-> m (Result m h)
next :: forall (m :: * -> *) h.
(MonadSTM m, MonadST m, MonadMask m) =>
WriteBufferReader m h -> m (Result m h)
next WriteBufferReader {Handle h
HasFS m h
HasBlockIO m h
Ref (BlobFile m h)
MutVar (PrimState m) (Maybe RawPage)
PrimVar (PrimState m) Word16
readerCurrentPage :: forall (m :: * -> *) h.
WriteBufferReader m h -> MutVar (PrimState m) (Maybe RawPage)
readerCurrentEntryNo :: forall (m :: * -> *) h.
WriteBufferReader m h -> PrimVar (PrimState m) Word16
readerKOpsHandle :: forall (m :: * -> *) h. WriteBufferReader m h -> Handle h
readerBlobFile :: forall (m :: * -> *) h. WriteBufferReader m h -> Ref (BlobFile m h)
readerHasFS :: forall (m :: * -> *) h. WriteBufferReader m h -> HasFS m h
readerHasBlockIO :: forall (m :: * -> *) h. WriteBufferReader m h -> HasBlockIO m h
readerCurrentPage :: MutVar (PrimState m) (Maybe RawPage)
readerCurrentEntryNo :: PrimVar (PrimState m) Word16
readerKOpsHandle :: Handle h
readerBlobFile :: Ref (BlobFile m h)
readerHasFS :: HasFS m h
readerHasBlockIO :: HasBlockIO m h
..} = do
MutVar (PrimState m) (Maybe RawPage) -> m (Maybe RawPage)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (Maybe RawPage)
readerCurrentPage m (Maybe RawPage)
-> (Maybe RawPage -> m (Result m h)) -> m (Result m h)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe RawPage
Nothing ->
Result m h -> m (Result m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Result m h
forall (m :: * -> *) h. Result m h
Empty
Just RawPage
page -> do
Word16
entryNo <- PrimVar (PrimState m) Word16 -> m Word16
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> m a
readPrimVar PrimVar (PrimState m) Word16
readerCurrentEntryNo
Word16 -> RawPage -> m (Result m h)
go Word16
entryNo RawPage
page
where
go :: Word16 -> RawPage -> m (Result m h)
go :: Word16 -> RawPage -> m (Result m h)
go !Word16
entryNo !RawPage
page =
case RawPage -> Word16 -> RawPageIndex (Entry SerialisedValue BlobSpan)
rawPageIndex RawPage
page Word16
entryNo of
RawPageIndex (Entry SerialisedValue BlobSpan)
IndexNotPresent -> do
Maybe RawPage
newPage <- HasFS m h -> Handle h -> m (Maybe RawPage)
forall (m :: * -> *) h.
(MonadCatch m, PrimMonad m) =>
HasFS m h -> Handle h -> m (Maybe RawPage)
readDiskPage HasFS m h
readerHasFS Handle h
readerKOpsHandle
ST (PrimState m) () -> m ()
forall a. ST (PrimState m) a -> m a
forall (m :: * -> *) a. MonadST m => ST (PrimState m) a -> m a
stToIO (ST (PrimState m) () -> m ()) -> ST (PrimState m) () -> m ()
forall a b. (a -> b) -> a -> b
$ MutVar (PrimState (ST (PrimState m))) (Maybe RawPage)
-> Maybe RawPage -> ST (PrimState m) ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (Maybe RawPage)
MutVar (PrimState (ST (PrimState m))) (Maybe RawPage)
readerCurrentPage Maybe RawPage
newPage
case Maybe RawPage
newPage of
Maybe RawPage
Nothing -> do
Result m h -> m (Result m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Result m h
forall (m :: * -> *) h. Result m h
Empty
Just RawPage
p -> do
PrimVar (PrimState m) Word16 -> Word16 -> m ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> a -> m ()
writePrimVar PrimVar (PrimState m) Word16
readerCurrentEntryNo Word16
0
Word16 -> RawPage -> m (Result m h)
go Word16
0 RawPage
p
IndexEntry SerialisedKey
key Entry SerialisedValue BlobSpan
entry -> do
PrimVar (PrimState m) Word16 -> (Word16 -> Word16) -> m ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> (a -> a) -> m ()
modifyPrimVar PrimVar (PrimState m) Word16
readerCurrentEntryNo (Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+Word16
1)
let entry' :: E.Entry SerialisedValue (RawBlobRef m h)
entry' :: Entry SerialisedValue (RawBlobRef m h)
entry' = (BlobSpan -> RawBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (RawBlobRef m h)
forall a b.
(a -> b) -> Entry SerialisedValue a -> Entry SerialisedValue b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ref (BlobFile m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (BlobFile m h) -> BlobSpan -> RawBlobRef m h
mkRawBlobRef Ref (BlobFile m h)
readerBlobFile) Entry SerialisedValue BlobSpan
entry
let rawEntry :: Entry m h
rawEntry = Entry SerialisedValue (RawBlobRef m h) -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h) -> Entry m h
Entry Entry SerialisedValue (RawBlobRef m h)
entry'
Result m h -> m (Result m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialisedKey -> Entry m h -> Result m h
forall (m :: * -> *) h. SerialisedKey -> Entry m h -> Result m h
ReadEntry SerialisedKey
key Entry m h
rawEntry)
IndexEntryOverflow SerialisedKey
key Entry SerialisedValue BlobSpan
entry Word32
lenSuffix -> do
PrimVar (PrimState m) Word16 -> (Word16 -> Word16) -> m ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
PrimVar (PrimState m) a -> (a -> a) -> m ()
modifyPrimVar PrimVar (PrimState m) Word16
readerCurrentEntryNo (Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+Word16
1)
let entry' :: E.Entry SerialisedValue (RawBlobRef m h)
entry' :: Entry SerialisedValue (RawBlobRef m h)
entry' = (BlobSpan -> RawBlobRef m h)
-> Entry SerialisedValue BlobSpan
-> Entry SerialisedValue (RawBlobRef m h)
forall a b.
(a -> b) -> Entry SerialisedValue a -> Entry SerialisedValue b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ref (BlobFile m h) -> BlobSpan -> RawBlobRef m h
forall (m :: * -> *) h.
Ref (BlobFile m h) -> BlobSpan -> RawBlobRef m h
mkRawBlobRef Ref (BlobFile m h)
readerBlobFile) Entry SerialisedValue BlobSpan
entry
[RawOverflowPage]
overflowPages <- HasFS m h -> Handle h -> Word32 -> m [RawOverflowPage]
forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m, PrimMonad m) =>
HasFS m h -> Handle h -> Word32 -> m [RawOverflowPage]
readOverflowPages HasFS m h
readerHasFS Handle h
readerKOpsHandle Word32
lenSuffix
let rawEntry :: Entry m h
rawEntry = Entry SerialisedValue (RawBlobRef m h)
-> RawPage -> Word32 -> [RawOverflowPage] -> Entry m h
forall (m :: * -> *) h.
Entry SerialisedValue (RawBlobRef m h)
-> RawPage -> Word32 -> [RawOverflowPage] -> Entry m h
mkEntryOverflow Entry SerialisedValue (RawBlobRef m h)
entry' RawPage
page Word32
lenSuffix [RawOverflowPage]
overflowPages
Result m h -> m (Result m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialisedKey -> Entry m h -> Result m h
forall (m :: * -> *) h. SerialisedKey -> Entry m h -> Result m h
ReadEntry SerialisedKey
key Entry m h
rawEntry)
{-# SPECIALISE close :: WriteBufferReader IO h -> IO () #-}
close ::
(MonadMask m, PrimMonad m)
=> WriteBufferReader m h
-> m ()
close :: forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
WriteBufferReader m h -> m ()
close WriteBufferReader{Handle h
HasFS m h
HasBlockIO m h
Ref (BlobFile m h)
MutVar (PrimState m) (Maybe RawPage)
PrimVar (PrimState m) Word16
readerCurrentPage :: forall (m :: * -> *) h.
WriteBufferReader m h -> MutVar (PrimState m) (Maybe RawPage)
readerCurrentEntryNo :: forall (m :: * -> *) h.
WriteBufferReader m h -> PrimVar (PrimState m) Word16
readerKOpsHandle :: forall (m :: * -> *) h. WriteBufferReader m h -> Handle h
readerBlobFile :: forall (m :: * -> *) h. WriteBufferReader m h -> Ref (BlobFile m h)
readerHasFS :: forall (m :: * -> *) h. WriteBufferReader m h -> HasFS m h
readerHasBlockIO :: forall (m :: * -> *) h. WriteBufferReader m h -> HasBlockIO m h
readerCurrentPage :: MutVar (PrimState m) (Maybe RawPage)
readerCurrentEntryNo :: PrimVar (PrimState m) Word16
readerKOpsHandle :: Handle h
readerBlobFile :: Ref (BlobFile m h)
readerHasFS :: HasFS m h
readerHasBlockIO :: HasBlockIO m h
..} = do
HasFS m h -> HasCallStack => Handle h -> m ()
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Handle h -> m ()
FS.hClose HasFS m h
readerHasFS Handle h
readerKOpsHandle
m () -> m () -> m ()
forall a b. m a -> m b -> m a
forall (m :: * -> *) a b. MonadThrow m => m a -> m b -> m a
`finally` Ref (BlobFile m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (BlobFile m h)
readerBlobFile