{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# OPTIONS_HADDOCK not-home #-}
module Database.LSMTree.Internal.Unsafe (
SessionDirDoesNotExistError (..)
, SessionDirLockedError (..)
, SessionDirCorruptedError (..)
, SessionClosedError (..)
, TableClosedError (..)
, TableCorruptedError (..)
, TableTooLargeError (..)
, TableUnionNotCompatibleError (..)
, SnapshotExistsError (..)
, SnapshotDoesNotExistError (..)
, SnapshotCorruptedError (..)
, SnapshotNotCompatibleError (..)
, BlobRefInvalidError (..)
, CursorClosedError (..)
, FileFormat (..)
, FileCorruptedError (..)
, Paths.InvalidSnapshotNameError (..)
, LSMTreeTrace (..)
, SessionId (..)
, SessionTrace (..)
, TableTrace (..)
, CursorTrace (..)
, Session (..)
, SessionState (..)
, SessionEnv (..)
, withKeepSessionOpen
, withOpenSession
, withNewSession
, withRestoreSession
, openSession
, newSession
, restoreSession
, closeSession
, Table (..)
, TableState (..)
, TableEnv (..)
, withKeepTableOpen
, ResolveSerialisedValue
, withTable
, new
, close
, lookups
, rangeLookup
, updates
, retrieveBlobs
, Cursor (..)
, CursorState (..)
, CursorEnv (..)
, OffsetKey (..)
, withCursor
, newCursor
, closeCursor
, readCursor
, readCursorWhile
, SnapshotLabel
, saveSnapshot
, openTableFromSnapshot
, deleteSnapshot
, doesSnapshotExist
, listSnapshots
, duplicate
, unions
, UnionDebt (..)
, remainingUnionDebt
, UnionCredits (..)
, supplyUnionCredits
) where
import qualified Codec.Serialise as S
import Control.ActionRegistry
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM (MonadSTM (..))
import Control.Concurrent.Class.MonadSTM.RWVar (RWVar)
import qualified Control.Concurrent.Class.MonadSTM.RWVar as RW
import Control.DeepSeq
import Control.Monad (forM, unless, void, when, (<$!>))
import Control.Monad.Class.MonadAsync as Async
import Control.Monad.Class.MonadST (MonadST (..))
import Control.Monad.Class.MonadThrow
import Control.Monad.Primitive
import Control.RefCount
import Control.Tracer
import qualified Data.BloomFilter.Hash as Bloom
import Data.Either (fromRight)
import Data.Foldable
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NE
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes, maybeToList)
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as Text
import Data.Typeable
import qualified Data.Vector as V
import Database.LSMTree.Internal.Arena (ArenaManager, newArenaManager)
import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
import Database.LSMTree.Internal.Config
import Database.LSMTree.Internal.Config.Override (TableConfigOverride,
overrideTableConfig)
import Database.LSMTree.Internal.CRC32C (FileCorruptedError (..),
FileFormat (..))
import qualified Database.LSMTree.Internal.Cursor as Cursor
import Database.LSMTree.Internal.Entry (Entry)
import Database.LSMTree.Internal.IncomingRun (IncomingRun (..))
import Database.LSMTree.Internal.Lookup (TableCorruptedError (..),
lookupsIO, lookupsIOWithWriteBuffer)
import Database.LSMTree.Internal.MergeSchedule
import Database.LSMTree.Internal.MergingRun (TableTooLargeError (..))
import qualified Database.LSMTree.Internal.MergingRun as MR
import Database.LSMTree.Internal.MergingTree
import qualified Database.LSMTree.Internal.MergingTree as MT
import qualified Database.LSMTree.Internal.MergingTree.Lookup as MT
import Database.LSMTree.Internal.Paths (SessionRoot (..),
SnapshotMetaDataChecksumFile (..),
SnapshotMetaDataFile (..), SnapshotName)
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Range (Range (..))
import Database.LSMTree.Internal.Readers (OffsetKey (..))
import qualified Database.LSMTree.Internal.Readers as Readers
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise (ResolveSerialisedValue,
SerialisedBlob (..), SerialisedKey, SerialisedValue)
import Database.LSMTree.Internal.Snapshot
import Database.LSMTree.Internal.Snapshot.Codec
import Database.LSMTree.Internal.UniqCounter
import qualified Database.LSMTree.Internal.Vector as V
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified System.FS.API as FS
import System.FS.API (FsError, FsErrorPath (..), FsPath, HasFS)
import qualified System.FS.API.Lazy as FS
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API (HasBlockIO)
data LSMTreeTrace =
TraceSession
SessionId
SessionTrace
| TraceTable
TableId
TableTrace
| TraceCursor
CursorId
CursorTrace
deriving stock Int -> LSMTreeTrace -> ShowS
[LSMTreeTrace] -> ShowS
LSMTreeTrace -> String
(Int -> LSMTreeTrace -> ShowS)
-> (LSMTreeTrace -> String)
-> ([LSMTreeTrace] -> ShowS)
-> Show LSMTreeTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LSMTreeTrace -> ShowS
showsPrec :: Int -> LSMTreeTrace -> ShowS
$cshow :: LSMTreeTrace -> String
show :: LSMTreeTrace -> String
$cshowList :: [LSMTreeTrace] -> ShowS
showList :: [LSMTreeTrace] -> ShowS
Show
newtype SessionId = SessionId FsPath
deriving stock (SessionId -> SessionId -> Bool
(SessionId -> SessionId -> Bool)
-> (SessionId -> SessionId -> Bool) -> Eq SessionId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionId -> SessionId -> Bool
== :: SessionId -> SessionId -> Bool
$c/= :: SessionId -> SessionId -> Bool
/= :: SessionId -> SessionId -> Bool
Eq, Int -> SessionId -> ShowS
[SessionId] -> ShowS
SessionId -> String
(Int -> SessionId -> ShowS)
-> (SessionId -> String)
-> ([SessionId] -> ShowS)
-> Show SessionId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionId -> ShowS
showsPrec :: Int -> SessionId -> ShowS
$cshow :: SessionId -> String
show :: SessionId -> String
$cshowList :: [SessionId] -> ShowS
showList :: [SessionId] -> ShowS
Show)
data SessionTrace =
TraceOpenSession
| TraceNewSession
| TraceRestoreSession
| TraceCreatedSession
| TraceCloseSession
| TraceClosedSession
| TraceDeleteSnapshot SnapshotName
| TraceDeletedSnapshot SnapshotName
| TraceListSnapshots
| TraceRetrieveBlobs Int
deriving stock Int -> SessionTrace -> ShowS
[SessionTrace] -> ShowS
SessionTrace -> String
(Int -> SessionTrace -> ShowS)
-> (SessionTrace -> String)
-> ([SessionTrace] -> ShowS)
-> Show SessionTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionTrace -> ShowS
showsPrec :: Int -> SessionTrace -> ShowS
$cshow :: SessionTrace -> String
show :: SessionTrace -> String
$cshowList :: [SessionTrace] -> ShowS
showList :: [SessionTrace] -> ShowS
Show
data TableTrace =
TraceCreatedTable
SessionId
TableConfig
| TraceNewTable TableConfig
| TraceCloseTable
| TraceClosedTable
| TraceLookups
Int
| TraceRangeLookup (Range SerialisedKey)
| TraceUpdates
Int
| TraceUpdated
Int
| TraceOpenTableFromSnapshot SnapshotName TableConfigOverride
| TraceSaveSnapshot SnapshotName
| TraceSavedSnapshot SnapshotName
| TraceDuplicate
TableId
| TraceIncrementalUnions
(NonEmpty TableId)
| TraceRemainingUnionDebt
| TraceSupplyUnionCredits UnionCredits
| TraceSuppliedUnionCredits
UnionCredits
UnionCredits
#ifdef DEBUG_TRACES
| TraceMerge (AtLevel MergeTrace)
#endif
deriving stock Int -> TableTrace -> ShowS
[TableTrace] -> ShowS
TableTrace -> String
(Int -> TableTrace -> ShowS)
-> (TableTrace -> String)
-> ([TableTrace] -> ShowS)
-> Show TableTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TableTrace -> ShowS
showsPrec :: Int -> TableTrace -> ShowS
$cshow :: TableTrace -> String
show :: TableTrace -> String
$cshowList :: [TableTrace] -> ShowS
showList :: [TableTrace] -> ShowS
Show
contramapTraceMerge :: Monad m => Tracer m TableTrace -> Tracer m (AtLevel MergeTrace)
#ifdef DEBUG_TRACES
contramapTraceMerge t = TraceMerge `contramap` t
#else
contramapTraceMerge :: forall (m :: * -> *).
Monad m =>
Tracer m TableTrace -> Tracer m (AtLevel MergeTrace)
contramapTraceMerge Tracer m TableTrace
t = (AtLevel MergeTrace -> Maybe TableTrace)
-> Tracer m TableTrace -> Tracer m (AtLevel MergeTrace)
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Tracer m b -> Tracer m a
traceMaybe (Maybe TableTrace -> AtLevel MergeTrace -> Maybe TableTrace
forall a b. a -> b -> a
const Maybe TableTrace
forall a. Maybe a
Nothing) Tracer m TableTrace
t
#endif
data CursorTrace =
TraceCreatedCursor
SessionId
| TraceNewCursor
TableId
OffsetKey
| TraceCloseCursor
| TraceClosedCursor
| TraceReadingCursor
Int
| TraceReadCursor
Int
Int
deriving stock Int -> CursorTrace -> ShowS
[CursorTrace] -> ShowS
CursorTrace -> String
(Int -> CursorTrace -> ShowS)
-> (CursorTrace -> String)
-> ([CursorTrace] -> ShowS)
-> Show CursorTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CursorTrace -> ShowS
showsPrec :: Int -> CursorTrace -> ShowS
$cshow :: CursorTrace -> String
show :: CursorTrace -> String
$cshowList :: [CursorTrace] -> ShowS
showList :: [CursorTrace] -> ShowS
Show
data Session m h = Session {
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState :: !(RWVar m (SessionState m h))
, forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
lsmTreeTracer :: !(Tracer m LSMTreeTrace)
, forall (m :: * -> *) h. Session m h -> Tracer m SessionTrace
sessionTracer :: !(Tracer m SessionTrace)
, forall (m :: * -> *) h. Session m h -> UniqCounter m
sessionUniqCounter :: !(UniqCounter m)
}
instance NFData (Session m h) where
rnf :: Session m h -> ()
rnf (Session RWVar m (SessionState m h)
a Tracer m LSMTreeTrace
b Tracer m SessionTrace
c UniqCounter m
d) = RWVar m (SessionState m h) -> ()
forall a. NFData a => a -> ()
rnf RWVar m (SessionState m h)
a () -> () -> ()
forall a b. a -> b -> b
`seq` Tracer m LSMTreeTrace -> ()
forall a. a -> ()
rwhnf Tracer m LSMTreeTrace
b () -> () -> ()
forall a b. a -> b -> b
`seq` Tracer m SessionTrace -> ()
forall a. a -> ()
rwhnf Tracer m SessionTrace
c () -> () -> ()
forall a b. a -> b -> b
`seq` UniqCounter m -> ()
forall a. NFData a => a -> ()
rnf UniqCounter m
d
data SessionState m h =
SessionOpen !(SessionEnv m h)
| SessionClosed
data SessionEnv m h = SessionEnv {
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot :: !SessionRoot
, forall (m :: * -> *) h. SessionEnv m h -> Salt
sessionSalt :: !Bloom.Salt
, forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS :: !(HasFS m h)
, forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO :: !(HasBlockIO m h)
, forall (m :: * -> *) h. SessionEnv m h -> LockFileHandle m
sessionLockFile :: !(FS.LockFileHandle m)
, forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables :: !(StrictMVar m (Map TableId (Table m h)))
, forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors :: !(StrictMVar m (Map CursorId (Cursor m h)))
}
{-# INLINE sessionId #-}
sessionId :: SessionEnv m h -> SessionId
sessionId :: forall (m :: * -> *) h. SessionEnv m h -> SessionId
sessionId = FsPath -> SessionId
SessionId (FsPath -> SessionId)
-> (SessionEnv m h -> FsPath) -> SessionEnv m h -> SessionId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SessionRoot -> FsPath
getSessionRoot (SessionRoot -> FsPath)
-> (SessionEnv m h -> SessionRoot) -> SessionEnv m h -> FsPath
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot
data SessionClosedError
= ErrSessionClosed
deriving stock (Int -> SessionClosedError -> ShowS
[SessionClosedError] -> ShowS
SessionClosedError -> String
(Int -> SessionClosedError -> ShowS)
-> (SessionClosedError -> String)
-> ([SessionClosedError] -> ShowS)
-> Show SessionClosedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionClosedError -> ShowS
showsPrec :: Int -> SessionClosedError -> ShowS
$cshow :: SessionClosedError -> String
show :: SessionClosedError -> String
$cshowList :: [SessionClosedError] -> ShowS
showList :: [SessionClosedError] -> ShowS
Show, SessionClosedError -> SessionClosedError -> Bool
(SessionClosedError -> SessionClosedError -> Bool)
-> (SessionClosedError -> SessionClosedError -> Bool)
-> Eq SessionClosedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionClosedError -> SessionClosedError -> Bool
== :: SessionClosedError -> SessionClosedError -> Bool
$c/= :: SessionClosedError -> SessionClosedError -> Bool
/= :: SessionClosedError -> SessionClosedError -> Bool
Eq)
deriving anyclass (Show SessionClosedError
Typeable SessionClosedError
(Typeable SessionClosedError, Show SessionClosedError) =>
(SessionClosedError -> SomeException)
-> (SomeException -> Maybe SessionClosedError)
-> (SessionClosedError -> String)
-> Exception SessionClosedError
SomeException -> Maybe SessionClosedError
SessionClosedError -> String
SessionClosedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionClosedError -> SomeException
toException :: SessionClosedError -> SomeException
$cfromException :: SomeException -> Maybe SessionClosedError
fromException :: SomeException -> Maybe SessionClosedError
$cdisplayException :: SessionClosedError -> String
displayException :: SessionClosedError -> String
Exception)
{-# INLINE withKeepSessionOpen #-}
{-# SPECIALISE withKeepSessionOpen ::
Session IO h
-> (SessionEnv IO h -> IO a)
-> IO a #-}
withKeepSessionOpen ::
(MonadSTM m, MonadThrow m)
=> Session m h
-> (SessionEnv m h -> m a)
-> m a
withKeepSessionOpen :: forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
sesh SessionEnv m h -> m a
action = RWVar m (SessionState m h) -> (SessionState m h -> m a) -> m a
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (Session m h -> RWVar m (SessionState m h)
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState Session m h
sesh) ((SessionState m h -> m a) -> m a)
-> (SessionState m h -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \case
SessionState m h
SessionClosed -> SessionClosedError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SessionClosedError
ErrSessionClosed
SessionOpen SessionEnv m h
seshEnv -> SessionEnv m h -> m a
action SessionEnv m h
seshEnv
data SessionDirDoesNotExistError
= ErrSessionDirDoesNotExist !FsErrorPath
deriving stock (Int -> SessionDirDoesNotExistError -> ShowS
[SessionDirDoesNotExistError] -> ShowS
SessionDirDoesNotExistError -> String
(Int -> SessionDirDoesNotExistError -> ShowS)
-> (SessionDirDoesNotExistError -> String)
-> ([SessionDirDoesNotExistError] -> ShowS)
-> Show SessionDirDoesNotExistError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionDirDoesNotExistError -> ShowS
showsPrec :: Int -> SessionDirDoesNotExistError -> ShowS
$cshow :: SessionDirDoesNotExistError -> String
show :: SessionDirDoesNotExistError -> String
$cshowList :: [SessionDirDoesNotExistError] -> ShowS
showList :: [SessionDirDoesNotExistError] -> ShowS
Show, SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
(SessionDirDoesNotExistError
-> SessionDirDoesNotExistError -> Bool)
-> (SessionDirDoesNotExistError
-> SessionDirDoesNotExistError -> Bool)
-> Eq SessionDirDoesNotExistError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
== :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
$c/= :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
/= :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
Eq)
deriving anyclass (Show SessionDirDoesNotExistError
Typeable SessionDirDoesNotExistError
(Typeable SessionDirDoesNotExistError,
Show SessionDirDoesNotExistError) =>
(SessionDirDoesNotExistError -> SomeException)
-> (SomeException -> Maybe SessionDirDoesNotExistError)
-> (SessionDirDoesNotExistError -> String)
-> Exception SessionDirDoesNotExistError
SomeException -> Maybe SessionDirDoesNotExistError
SessionDirDoesNotExistError -> String
SessionDirDoesNotExistError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionDirDoesNotExistError -> SomeException
toException :: SessionDirDoesNotExistError -> SomeException
$cfromException :: SomeException -> Maybe SessionDirDoesNotExistError
fromException :: SomeException -> Maybe SessionDirDoesNotExistError
$cdisplayException :: SessionDirDoesNotExistError -> String
displayException :: SessionDirDoesNotExistError -> String
Exception)
data SessionDirLockedError
= ErrSessionDirLocked !FsErrorPath
deriving stock (Int -> SessionDirLockedError -> ShowS
[SessionDirLockedError] -> ShowS
SessionDirLockedError -> String
(Int -> SessionDirLockedError -> ShowS)
-> (SessionDirLockedError -> String)
-> ([SessionDirLockedError] -> ShowS)
-> Show SessionDirLockedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionDirLockedError -> ShowS
showsPrec :: Int -> SessionDirLockedError -> ShowS
$cshow :: SessionDirLockedError -> String
show :: SessionDirLockedError -> String
$cshowList :: [SessionDirLockedError] -> ShowS
showList :: [SessionDirLockedError] -> ShowS
Show, SessionDirLockedError -> SessionDirLockedError -> Bool
(SessionDirLockedError -> SessionDirLockedError -> Bool)
-> (SessionDirLockedError -> SessionDirLockedError -> Bool)
-> Eq SessionDirLockedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionDirLockedError -> SessionDirLockedError -> Bool
== :: SessionDirLockedError -> SessionDirLockedError -> Bool
$c/= :: SessionDirLockedError -> SessionDirLockedError -> Bool
/= :: SessionDirLockedError -> SessionDirLockedError -> Bool
Eq)
deriving anyclass (Show SessionDirLockedError
Typeable SessionDirLockedError
(Typeable SessionDirLockedError, Show SessionDirLockedError) =>
(SessionDirLockedError -> SomeException)
-> (SomeException -> Maybe SessionDirLockedError)
-> (SessionDirLockedError -> String)
-> Exception SessionDirLockedError
SomeException -> Maybe SessionDirLockedError
SessionDirLockedError -> String
SessionDirLockedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionDirLockedError -> SomeException
toException :: SessionDirLockedError -> SomeException
$cfromException :: SomeException -> Maybe SessionDirLockedError
fromException :: SomeException -> Maybe SessionDirLockedError
$cdisplayException :: SessionDirLockedError -> String
displayException :: SessionDirLockedError -> String
Exception)
data SessionDirCorruptedError
= ErrSessionDirCorrupted !Text !FsErrorPath
deriving stock (Int -> SessionDirCorruptedError -> ShowS
[SessionDirCorruptedError] -> ShowS
SessionDirCorruptedError -> String
(Int -> SessionDirCorruptedError -> ShowS)
-> (SessionDirCorruptedError -> String)
-> ([SessionDirCorruptedError] -> ShowS)
-> Show SessionDirCorruptedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionDirCorruptedError -> ShowS
showsPrec :: Int -> SessionDirCorruptedError -> ShowS
$cshow :: SessionDirCorruptedError -> String
show :: SessionDirCorruptedError -> String
$cshowList :: [SessionDirCorruptedError] -> ShowS
showList :: [SessionDirCorruptedError] -> ShowS
Show, SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
(SessionDirCorruptedError -> SessionDirCorruptedError -> Bool)
-> (SessionDirCorruptedError -> SessionDirCorruptedError -> Bool)
-> Eq SessionDirCorruptedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
== :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
$c/= :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
/= :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
Eq)
deriving anyclass (Show SessionDirCorruptedError
Typeable SessionDirCorruptedError
(Typeable SessionDirCorruptedError,
Show SessionDirCorruptedError) =>
(SessionDirCorruptedError -> SomeException)
-> (SomeException -> Maybe SessionDirCorruptedError)
-> (SessionDirCorruptedError -> String)
-> Exception SessionDirCorruptedError
SomeException -> Maybe SessionDirCorruptedError
SessionDirCorruptedError -> String
SessionDirCorruptedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionDirCorruptedError -> SomeException
toException :: SessionDirCorruptedError -> SomeException
$cfromException :: SomeException -> Maybe SessionDirCorruptedError
fromException :: SomeException -> Maybe SessionDirCorruptedError
$cdisplayException :: SessionDirCorruptedError -> String
displayException :: SessionDirCorruptedError -> String
Exception)
{-# INLINE withOpenSession #-}
withOpenSession ::
forall m h a.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m, MonadEvaluate m)
=> Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> Bloom.Salt
-> FsPath
-> (Session m h -> m a)
-> m a
withOpenSession :: forall (m :: * -> *) h a.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m,
MonadEvaluate m) =>
Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> Salt
-> FsPath
-> (Session m h -> m a)
-> m a
withOpenSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio Salt
salt FsPath
dir Session m h -> m a
k = do
m (Session m h)
-> (Session m h -> m ()) -> (Session m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> Salt -> FsPath -> m (Session m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m,
MonadEvaluate m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> Salt -> FsPath -> m (Session m h)
openSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio Salt
salt FsPath
dir)
Session m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> m ()
closeSession
Session m h -> m a
k
{-# INLINE withNewSession #-}
withNewSession ::
forall m h a.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m)
=> Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> Bloom.Salt
-> FsPath
-> (Session m h -> m a)
-> m a
withNewSession :: forall (m :: * -> *) h a.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> Salt
-> FsPath
-> (Session m h -> m a)
-> m a
withNewSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio Salt
salt FsPath
dir Session m h -> m a
k = do
m (Session m h)
-> (Session m h -> m ()) -> (Session m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> Salt -> FsPath -> m (Session m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> Salt -> FsPath -> m (Session m h)
newSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio Salt
salt FsPath
dir)
Session m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> m ()
closeSession
Session m h -> m a
k
{-# INLINE withRestoreSession #-}
withRestoreSession ::
forall m h a.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m, MonadEvaluate m)
=> Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> FsPath
-> (Session m h -> m a)
-> m a
withRestoreSession :: forall (m :: * -> *) h a.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m,
MonadEvaluate m) =>
Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> FsPath
-> (Session m h -> m a)
-> m a
withRestoreSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio FsPath
dir Session m h -> m a
k = do
m (Session m h)
-> (Session m h -> m ()) -> (Session m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m,
MonadEvaluate m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
restoreSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio FsPath
dir)
Session m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> m ()
closeSession
Session m h -> m a
k
{-# SPECIALISE openSession ::
Tracer IO LSMTreeTrace
-> HasFS IO h
-> HasBlockIO IO h
-> Bloom.Salt
-> FsPath
-> IO (Session IO h) #-}
openSession ::
forall m h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m, MonadEvaluate m)
=> Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> Bloom.Salt
-> FsPath
-> m (Session m h)
openSession :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m,
MonadEvaluate m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> Salt -> FsPath -> m (Session m h)
openSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio Salt
salt FsPath
dir = do
Tracer m SessionTrace -> SessionTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m SessionTrace
sessionTracer SessionTrace
TraceOpenSession
Bool
dirExists <- HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
dir
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
dirExists (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
SessionDirDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirDoesNotExistError
ErrSessionDirDoesNotExist (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
dir))
Bool
b <- HasFS m h -> FsPath -> m Bool
forall (m :: * -> *) h. Monad m => HasFS m h -> FsPath -> m Bool
isSessionDirEmpty HasFS m h
hfs FsPath
dir
if Bool
b then
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> Salt -> FsPath -> m (Session m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> Salt -> FsPath -> m (Session m h)
newSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio Salt
salt FsPath
dir
else
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m,
MonadEvaluate m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
restoreSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio FsPath
dir
where
sessionTracer :: Tracer m SessionTrace
sessionTracer = SessionId -> SessionTrace -> LSMTreeTrace
TraceSession (FsPath -> SessionId
SessionId FsPath
dir) (SessionTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m SessionTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m LSMTreeTrace
tr
{-# SPECIALISE newSession ::
Tracer IO LSMTreeTrace
-> HasFS IO h
-> HasBlockIO IO h
-> Bloom.Salt
-> FsPath
-> IO (Session IO h) #-}
newSession ::
forall m h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m)
=> Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> Bloom.Salt
-> FsPath
-> m (Session m h)
newSession :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> Salt -> FsPath -> m (Session m h)
newSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio Salt
salt FsPath
dir = do
Tracer m SessionTrace -> SessionTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m SessionTrace
sessionTracer SessionTrace
TraceNewSession
(ActionRegistry m -> m (Session m h)) -> m (Session m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Session m h)) -> m (Session m h))
-> (ActionRegistry m -> m (Session m h)) -> m (Session m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
Bool
dirExists <- HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
dir
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
dirExists (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
SessionDirDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirDoesNotExistError
ErrSessionDirDoesNotExist (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
dir))
LockFileHandle m
sessionFileLock <- HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> FsPath
-> m (LockFileHandle m)
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> FsPath
-> m (LockFileHandle m)
acquireSessionLock HasFS m h
hfs HasBlockIO m h
hbio ActionRegistry m
reg FsPath
lockFilePath
Bool
b <- HasFS m h -> FsPath -> m Bool
forall (m :: * -> *) h. Monad m => HasFS m h -> FsPath -> m Bool
isSessionDirEmpty HasFS m h
hfs FsPath
dir
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SessionDirCorruptedError -> m ())
-> SessionDirCorruptedError -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted
(String -> Text
Text.pack String
"Session directory is non-empty")
(HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
dir)
ActionRegistry m -> m () -> m () -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> m () -> m a
withRollback_ ActionRegistry m
reg
(HasFS m h -> FsPath -> OpenMode -> (Handle h -> m ()) -> m ()
forall (m :: * -> *) h a.
(HasCallStack, MonadThrow m) =>
HasFS m h -> FsPath -> OpenMode -> (Handle h -> m a) -> m a
FS.withFile HasFS m h
hfs FsPath
metadataFilePath (AllowExisting -> OpenMode
FS.WriteMode AllowExisting
FS.MustBeNew) ((Handle h -> m ()) -> m ()) -> (Handle h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Handle h
h ->
m Salt -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Salt -> m ()) -> m Salt -> m ()
forall a b. (a -> b) -> a -> b
$ HasFS m h -> Handle h -> ByteString -> m Salt
forall (m :: * -> *) h.
(HasCallStack, Monad m) =>
HasFS m h -> Handle h -> ByteString -> m Salt
FS.hPutAll HasFS m h
hfs Handle h
h (ByteString -> m Salt) -> ByteString -> m Salt
forall a b. (a -> b) -> a -> b
$ Salt -> ByteString
forall a. Serialise a => a -> ByteString
S.serialise Salt
salt)
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeFile HasFS m h
hfs FsPath
metadataFilePath)
ActionRegistry m -> m () -> m () -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> m () -> m a
withRollback_ ActionRegistry m
reg
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.createDirectory HasFS m h
hfs FsPath
activeDirPath)
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs FsPath
activeDirPath)
ActionRegistry m -> m () -> m () -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> m () -> m a
withRollback_ ActionRegistry m
reg
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.createDirectory HasFS m h
hfs FsPath
snapshotsDirPath)
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs FsPath
snapshotsDirPath)
Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> SessionRoot
-> LockFileHandle m
-> Salt
-> m (Session m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadSTM m) =>
Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> SessionRoot
-> LockFileHandle m
-> Salt
-> m (Session m h)
mkSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio ActionRegistry m
reg SessionRoot
root LockFileHandle m
sessionFileLock Salt
salt
where
sessionTracer :: Tracer m SessionTrace
sessionTracer = SessionId -> SessionTrace -> LSMTreeTrace
TraceSession (FsPath -> SessionId
SessionId FsPath
dir) (SessionTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m SessionTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m LSMTreeTrace
tr
root :: SessionRoot
root = FsPath -> SessionRoot
Paths.SessionRoot FsPath
dir
lockFilePath :: FsPath
lockFilePath = SessionRoot -> FsPath
Paths.lockFile SessionRoot
root
metadataFilePath :: FsPath
metadataFilePath = SessionRoot -> FsPath
Paths.metadataFile SessionRoot
root
activeDirPath :: FsPath
activeDirPath = ActiveDir -> FsPath
Paths.getActiveDir (SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root)
snapshotsDirPath :: FsPath
snapshotsDirPath = SessionRoot -> FsPath
Paths.snapshotsDir SessionRoot
root
{-# SPECIALISE restoreSession ::
Tracer IO LSMTreeTrace
-> HasFS IO h
-> HasBlockIO IO h
-> FsPath
-> IO (Session IO h) #-}
restoreSession ::
forall m h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m, MonadEvaluate m)
=> Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> FsPath
-> m (Session m h)
restoreSession :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m,
MonadEvaluate m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
restoreSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio FsPath
dir = do
Tracer m SessionTrace -> SessionTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m SessionTrace
sessionTracer SessionTrace
TraceRestoreSession
(ActionRegistry m -> m (Session m h)) -> m (Session m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Session m h)) -> m (Session m h))
-> (ActionRegistry m -> m (Session m h)) -> m (Session m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
Bool
dirExists <- HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
dir
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
dirExists (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
SessionDirDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirDoesNotExistError
ErrSessionDirDoesNotExist (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
dir))
LockFileHandle m
sessionFileLock <- HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> FsPath
-> m (LockFileHandle m)
forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> FsPath
-> m (LockFileHandle m)
acquireSessionLock HasFS m h
hfs HasBlockIO m h
hbio ActionRegistry m
reg FsPath
lockFilePath
Bool
b <- HasFS m h -> FsPath -> m Bool
forall (m :: * -> *) h. Monad m => HasFS m h -> FsPath -> m Bool
isSessionDirEmpty HasFS m h
hfs FsPath
dir
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SessionDirCorruptedError -> m ())
-> SessionDirCorruptedError -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted
(String -> Text
Text.pack String
"Session directory is empty")
(HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
dir)
m ()
checkTopLevelDirLayout
Salt
salt <-
HasFS m h -> FsPath -> OpenMode -> (Handle h -> m Salt) -> m Salt
forall (m :: * -> *) h a.
(HasCallStack, MonadThrow m) =>
HasFS m h -> FsPath -> OpenMode -> (Handle h -> m a) -> m a
FS.withFile HasFS m h
hfs FsPath
metadataFilePath OpenMode
FS.ReadMode ((Handle h -> m Salt) -> m Salt) -> (Handle h -> m Salt) -> m Salt
forall a b. (a -> b) -> a -> b
$ \Handle h
h -> do
ByteString
bs <- HasFS m h -> Handle h -> m ByteString
forall (m :: * -> *) h.
Monad m =>
HasFS m h -> Handle h -> m ByteString
FS.hGetAll HasFS m h
hfs Handle h
h
Salt -> m Salt
forall a. a -> m a
forall (m :: * -> *) a. MonadEvaluate m => a -> m a
evaluate (Salt -> m Salt) -> Salt -> m Salt
forall a b. (a -> b) -> a -> b
$ ByteString -> Salt
forall a. Serialise a => ByteString -> a
S.deserialise ByteString
bs
HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs FsPath
activeDirPath
m () -> m () -> m ()
forall a b. m a -> m b -> m a
forall (m :: * -> *) a b. MonadThrow m => m a -> m b -> m a
`finally` HasFS m h -> HasCallStack => Bool -> FsPath -> m ()
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Bool -> FsPath -> m ()
FS.createDirectoryIfMissing HasFS m h
hfs Bool
False FsPath
activeDirPath
m ()
checkActiveDirLayout
m ()
checkSnapshotsDirLayout
Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> SessionRoot
-> LockFileHandle m
-> Salt
-> m (Session m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadSTM m) =>
Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> SessionRoot
-> LockFileHandle m
-> Salt
-> m (Session m h)
mkSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio ActionRegistry m
reg SessionRoot
root LockFileHandle m
sessionFileLock Salt
salt
where
sessionTracer :: Tracer m SessionTrace
sessionTracer = SessionId -> SessionTrace -> LSMTreeTrace
TraceSession (FsPath -> SessionId
SessionId FsPath
dir) (SessionTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m SessionTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m LSMTreeTrace
tr
root :: SessionRoot
root = FsPath -> SessionRoot
Paths.SessionRoot FsPath
dir
lockFilePath :: FsPath
lockFilePath = SessionRoot -> FsPath
Paths.lockFile SessionRoot
root
metadataFilePath :: FsPath
metadataFilePath = SessionRoot -> FsPath
Paths.metadataFile SessionRoot
root
activeDirPath :: FsPath
activeDirPath = ActiveDir -> FsPath
Paths.getActiveDir (SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root)
snapshotsDirPath :: FsPath
snapshotsDirPath = SessionRoot -> FsPath
Paths.snapshotsDir SessionRoot
root
checkTopLevelDirLayout :: m ()
checkTopLevelDirLayout = do
HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesFileExist HasFS m h
hfs FsPath
metadataFilePath m Bool -> (Bool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
b ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SessionDirCorruptedError -> m ())
-> SessionDirCorruptedError -> m ()
forall a b. (a -> b) -> a -> b
$
Text -> FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted
(String -> Text
Text.pack String
"Missing metadata file")
(HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
metadataFilePath)
HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
activeDirPath m Bool -> (Bool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
b ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SessionDirCorruptedError -> m ())
-> SessionDirCorruptedError -> m ()
forall a b. (a -> b) -> a -> b
$
Text -> FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted
(String -> Text
Text.pack String
"Missing active directory")
(HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
activeDirPath)
HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
snapshotsDirPath m Bool -> (Bool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
b ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SessionDirCorruptedError -> m ())
-> SessionDirCorruptedError -> m ()
forall a b. (a -> b) -> a -> b
$
Text -> FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted
(String -> Text
Text.pack String
"Missing snapshot directory")
(HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
snapshotsDirPath)
checkActiveDirLayout :: m ()
checkActiveDirLayout = do
Set String
contents <- HasFS m h -> HasCallStack => FsPath -> m (Set String)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m (Set String)
FS.listDirectory HasFS m h
hfs FsPath
activeDirPath
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Set String -> Bool
forall a. Set a -> Bool
Set.null Set String
contents) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SessionDirCorruptedError -> m ())
-> SessionDirCorruptedError -> m ()
forall a b. (a -> b) -> a -> b
$
Text -> FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted
(String -> Text
Text.pack String
"Active directory is non-empty")
(HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
activeDirPath)
checkSnapshotsDirLayout :: m ()
checkSnapshotsDirLayout = () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
{-# SPECIALISE closeSession :: Session IO h -> IO () #-}
closeSession ::
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
=> Session m h
-> m ()
closeSession :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> m ()
closeSession Session{RWVar m (SessionState m h)
sessionState :: forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState :: RWVar m (SessionState m h)
sessionState, Tracer m SessionTrace
sessionTracer :: forall (m :: * -> *) h. Session m h -> Tracer m SessionTrace
sessionTracer :: Tracer m SessionTrace
sessionTracer} = do
Tracer m SessionTrace -> SessionTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m SessionTrace
sessionTracer SessionTrace
TraceCloseSession
m (SessionState m h)
-> (SessionState m h -> m ())
-> (ActionRegistry m -> SessionState m h -> m (SessionState m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_
(RWVar m (SessionState m h) -> m (SessionState m h)
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> m a
RW.unsafeAcquireWriteAccess RWVar m (SessionState m h)
sessionState)
(STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (SessionState m h -> STM m ()) -> SessionState m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RWVar m (SessionState m h) -> SessionState m h -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> a -> STM m ()
RW.unsafeReleaseWriteAccess RWVar m (SessionState m h)
sessionState)
((ActionRegistry m -> SessionState m h -> m (SessionState m h))
-> m ())
-> (ActionRegistry m -> SessionState m h -> m (SessionState m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> \case
SessionState m h
SessionClosed -> SessionState m h -> m (SessionState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SessionState m h
forall (m :: * -> *) h. SessionState m h
SessionClosed
SessionOpen SessionEnv m h
seshEnv -> do
Map CursorId (Cursor m h)
cursors <-
ActionRegistry m
-> m (Map CursorId (Cursor m h))
-> (Map CursorId (Cursor m h) -> m ())
-> m (Map CursorId (Cursor m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(StrictMVar m (Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
seshEnv) Map CursorId (Cursor m h)
forall k a. Map k a
Map.empty)
(m (Map CursorId (Cursor m h)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Map CursorId (Cursor m h)) -> m ())
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> Map CursorId (Cursor m h)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar m (Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
seshEnv))
(Cursor m h -> m ()) -> Map CursorId (Cursor m h) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Cursor m h -> m ()) -> Cursor m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Cursor m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) =>
Cursor m h -> m ()
closeCursor) Map CursorId (Cursor m h)
cursors
Map TableId (Table m h)
tables <-
ActionRegistry m
-> m (Map TableId (Table m h))
-> (Map TableId (Table m h) -> m ())
-> m (Map TableId (Table m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(StrictMVar m (Map TableId (Table m h))
-> Map TableId (Table m h) -> m (Map TableId (Table m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables SessionEnv m h
seshEnv) Map TableId (Table m h)
forall k a. Map k a
Map.empty)
(m (Map TableId (Table m h)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Map TableId (Table m h)) -> m ())
-> (Map TableId (Table m h) -> m (Map TableId (Table m h)))
-> Map TableId (Table m h)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar m (Map TableId (Table m h))
-> Map TableId (Table m h) -> m (Map TableId (Table m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables SessionEnv m h
seshEnv))
(Table m h -> m ()) -> Map TableId (Table m h) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Table m h -> m ()) -> Table m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Table m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Table m h -> m ()
close) Map TableId (Table m h)
tables
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ LockFileHandle m -> m ()
forall (m :: * -> *). LockFileHandle m -> m ()
FS.hUnlock (SessionEnv m h -> LockFileHandle m
forall (m :: * -> *) h. SessionEnv m h -> LockFileHandle m
sessionLockFile SessionEnv m h
seshEnv)
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Tracer m SessionTrace -> SessionTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m SessionTrace
sessionTracer SessionTrace
TraceClosedSession
SessionState m h -> m (SessionState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SessionState m h
forall (m :: * -> *) h. SessionState m h
SessionClosed
{-# SPECIALISE acquireSessionLock ::
HasFS IO h
-> HasBlockIO IO h
-> ActionRegistry IO
-> FsPath
-> IO (FS.LockFileHandle IO) #-}
acquireSessionLock ::
forall m h. (MonadSTM m, PrimMonad m, MonadMask m)
=> HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> FsPath
-> m (FS.LockFileHandle m)
acquireSessionLock :: forall (m :: * -> *) h.
(MonadSTM m, PrimMonad m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> FsPath
-> m (LockFileHandle m)
acquireSessionLock HasFS m h
hfs HasBlockIO m h
hbio ActionRegistry m
reg FsPath
lockFilePath = do
Either FsError (Maybe (LockFileHandle m))
elock <-
ActionRegistry m
-> (Either FsError (Maybe (LockFileHandle m))
-> Maybe (LockFileHandle m))
-> m (Either FsError (Maybe (LockFileHandle m)))
-> (LockFileHandle m -> m ())
-> m (Either FsError (Maybe (LockFileHandle m)))
forall (m :: * -> *) a b.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> (a -> Maybe b) -> m a -> (b -> m ()) -> m a
withRollbackFun ActionRegistry m
reg
(Maybe (LockFileHandle m)
-> Either FsError (Maybe (LockFileHandle m))
-> Maybe (LockFileHandle m)
forall b a. b -> Either a b -> b
fromRight Maybe (LockFileHandle m)
forall a. Maybe a
Nothing)
m (Either FsError (Maybe (LockFileHandle m)))
acquireLock
LockFileHandle m -> m ()
forall (m :: * -> *). LockFileHandle m -> m ()
releaseLock
case Either FsError (Maybe (LockFileHandle m))
elock of
Left FsError
e
| FsErrorType
FS.FsResourceAlreadyInUse <- FsError -> FsErrorType
FS.fsErrorType FsError
e
, fsep :: FsErrorPath
fsep@(FsErrorPath Maybe MountPoint
_ FsPath
fsp) <- FsError -> FsErrorPath
FS.fsErrorPath FsError
e
, FsPath
fsp FsPath -> FsPath -> Bool
forall a. Eq a => a -> a -> Bool
== FsPath
lockFilePath
-> SessionDirLockedError -> m (LockFileHandle m)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirLockedError
ErrSessionDirLocked FsErrorPath
fsep)
Left FsError
e -> FsError -> m (LockFileHandle m)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO FsError
e
Right Maybe (LockFileHandle m)
Nothing -> SessionDirLockedError -> m (LockFileHandle m)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirLockedError
ErrSessionDirLocked (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
lockFilePath))
Right (Just LockFileHandle m
sessionFileLock) -> LockFileHandle m -> m (LockFileHandle m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure LockFileHandle m
sessionFileLock
where
acquireLock :: m (Either FsError (Maybe (LockFileHandle m)))
acquireLock = forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @m @FsError (m (Maybe (LockFileHandle m))
-> m (Either FsError (Maybe (LockFileHandle m))))
-> m (Maybe (LockFileHandle m))
-> m (Either FsError (Maybe (LockFileHandle m)))
forall a b. (a -> b) -> a -> b
$ HasBlockIO m h
-> FsPath -> LockMode -> m (Maybe (LockFileHandle m))
forall (m :: * -> *) h.
HasBlockIO m h
-> FsPath -> LockMode -> m (Maybe (LockFileHandle m))
FS.tryLockFile HasBlockIO m h
hbio FsPath
lockFilePath LockMode
FS.ExclusiveLock
releaseLock :: LockFileHandle m -> m ()
releaseLock = LockFileHandle m -> m ()
forall (m :: * -> *). LockFileHandle m -> m ()
FS.hUnlock
{-# SPECIALISE mkSession ::
Tracer IO LSMTreeTrace
-> HasFS IO h
-> HasBlockIO IO h
-> ActionRegistry IO
-> SessionRoot
-> FS.LockFileHandle IO
-> Bloom.Salt
-> IO (Session IO h) #-}
mkSession ::
(PrimMonad m, MonadMVar m, MonadSTM m)
=> Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> SessionRoot
-> FS.LockFileHandle m
-> Bloom.Salt
-> m (Session m h)
mkSession :: forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadSTM m) =>
Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> ActionRegistry m
-> SessionRoot
-> LockFileHandle m
-> Salt
-> m (Session m h)
mkSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio ActionRegistry m
reg root :: SessionRoot
root@(SessionRoot FsPath
dir) LockFileHandle m
lockFile Salt
salt = do
UniqCounter m
counterVar <- Int -> m (UniqCounter m)
forall (m :: * -> *). PrimMonad m => Int -> m (UniqCounter m)
newUniqCounter Int
0
StrictMVar m (Map TableId (Table m h))
openTablesVar <- Map TableId (Table m h)
-> m (StrictMVar m (Map TableId (Table m h)))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar Map TableId (Table m h)
forall k a. Map k a
Map.empty
StrictMVar m (Map CursorId (Cursor m h))
openCursorsVar <- Map CursorId (Cursor m h)
-> m (StrictMVar m (Map CursorId (Cursor m h)))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar Map CursorId (Cursor m h)
forall k a. Map k a
Map.empty
RWVar m (SessionState m h)
sessionVar <- SessionState m h -> m (RWVar m (SessionState m h))
forall (m :: * -> *) a. MonadSTM m => a -> m (RWVar m a)
RW.new (SessionState m h -> m (RWVar m (SessionState m h)))
-> SessionState m h -> m (RWVar m (SessionState m h))
forall a b. (a -> b) -> a -> b
$ SessionEnv m h -> SessionState m h
forall (m :: * -> *) h. SessionEnv m h -> SessionState m h
SessionOpen (SessionEnv m h -> SessionState m h)
-> SessionEnv m h -> SessionState m h
forall a b. (a -> b) -> a -> b
$ SessionEnv {
sessionRoot :: SessionRoot
sessionRoot = SessionRoot
root
, sessionSalt :: Salt
sessionSalt = Salt
salt
, sessionHasFS :: HasFS m h
sessionHasFS = HasFS m h
hfs
, sessionHasBlockIO :: HasBlockIO m h
sessionHasBlockIO = HasBlockIO m h
hbio
, sessionLockFile :: LockFileHandle m
sessionLockFile = LockFileHandle m
lockFile
, sessionOpenTables :: StrictMVar m (Map TableId (Table m h))
sessionOpenTables = StrictMVar m (Map TableId (Table m h))
openTablesVar
, sessionOpenCursors :: StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors = StrictMVar m (Map CursorId (Cursor m h))
openCursorsVar
}
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m SessionTrace -> SessionTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m SessionTrace
sessionTracer SessionTrace
TraceCreatedSession
Session m h -> m (Session m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Session m h -> m (Session m h)) -> Session m h -> m (Session m h)
forall a b. (a -> b) -> a -> b
$! Session {
sessionState :: RWVar m (SessionState m h)
sessionState = RWVar m (SessionState m h)
sessionVar
, lsmTreeTracer :: Tracer m LSMTreeTrace
lsmTreeTracer = Tracer m LSMTreeTrace
tr
, sessionTracer :: Tracer m SessionTrace
sessionTracer = Tracer m SessionTrace
sessionTracer
, sessionUniqCounter :: UniqCounter m
sessionUniqCounter = UniqCounter m
counterVar
}
where
sessionTracer :: Tracer m SessionTrace
sessionTracer = SessionId -> SessionTrace -> LSMTreeTrace
TraceSession (FsPath -> SessionId
SessionId FsPath
dir) (SessionTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m SessionTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m LSMTreeTrace
tr
{-# INLINE isSessionDirEmpty #-}
isSessionDirEmpty :: Monad m => HasFS m h -> FsPath -> m Bool
isSessionDirEmpty :: forall (m :: * -> *) h. Monad m => HasFS m h -> FsPath -> m Bool
isSessionDirEmpty HasFS m h
hfs FsPath
dir = do
Set String
dirContents <- HasFS m h -> HasCallStack => FsPath -> m (Set String)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m (Set String)
FS.listDirectory HasFS m h
hfs FsPath
dir
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Set String -> Bool
forall a. Set a -> Bool
Set.null Set String
dirContents Bool -> Bool -> Bool
|| Set String
dirContents Set String -> Set String -> Bool
forall a. Eq a => a -> a -> Bool
== String -> Set String
forall a. a -> Set a
Set.singleton String
Paths.lockFileName
data Table m h = Table {
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig :: !TableConfig
, forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState :: !(RWVar m (TableState m h))
, forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager :: !(ArenaManager (PrimState m))
, forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer :: !(Tracer m TableTrace)
, forall (m :: * -> *) h. Table m h -> TableId
tableId :: !TableId
, forall (m :: * -> *) h. Table m h -> Session m h
tableSession :: !(Session m h)
}
instance NFData (Table m h) where
rnf :: Table m h -> ()
rnf (Table TableConfig
a RWVar m (TableState m h)
b ArenaManager (PrimState m)
c Tracer m TableTrace
d TableId
e Session m h
f) =
TableConfig -> ()
forall a. NFData a => a -> ()
rnf TableConfig
a () -> () -> ()
forall a b. a -> b -> b
`seq` RWVar m (TableState m h) -> ()
forall a. NFData a => a -> ()
rnf RWVar m (TableState m h)
b () -> () -> ()
forall a b. a -> b -> b
`seq` ArenaManager (PrimState m) -> ()
forall a. NFData a => a -> ()
rnf ArenaManager (PrimState m)
c () -> () -> ()
forall a b. a -> b -> b
`seq` Tracer m TableTrace -> ()
forall a. a -> ()
rwhnf Tracer m TableTrace
d () -> () -> ()
forall a b. a -> b -> b
`seq` TableId -> ()
forall a. NFData a => a -> ()
rnf TableId
e() -> () -> ()
forall a b. a -> b -> b
`seq` Session m h -> ()
forall a. a -> ()
rwhnf Session m h
f
data TableState m h =
TableOpen !(TableEnv m h)
| TableClosed
data TableEnv m h = TableEnv {
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv :: !(SessionEnv m h)
, forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent :: !(RWVar m (TableContent m h))
}
{-# INLINE tableSessionRoot #-}
tableSessionRoot :: TableEnv m h -> SessionRoot
tableSessionRoot :: forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot = SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot (SessionEnv m h -> SessionRoot)
-> (TableEnv m h -> SessionEnv m h) -> TableEnv m h -> SessionRoot
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv
{-# INLINE tableSessionId #-}
tableSessionId :: TableEnv m h -> SessionId
tableSessionId :: forall (m :: * -> *) h. TableEnv m h -> SessionId
tableSessionId = SessionEnv m h -> SessionId
forall (m :: * -> *) h. SessionEnv m h -> SessionId
sessionId (SessionEnv m h -> SessionId)
-> (TableEnv m h -> SessionEnv m h) -> TableEnv m h -> SessionId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv
{-# INLINE tableSessionSalt #-}
tableSessionSalt :: TableEnv m h -> Bloom.Salt
tableSessionSalt :: forall (m :: * -> *) h. TableEnv m h -> Salt
tableSessionSalt = SessionEnv m h -> Salt
forall (m :: * -> *) h. SessionEnv m h -> Salt
sessionSalt (SessionEnv m h -> Salt)
-> (TableEnv m h -> SessionEnv m h) -> TableEnv m h -> Salt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv
{-# INLINE tableHasFS #-}
tableHasFS :: TableEnv m h -> HasFS m h
tableHasFS :: forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS = SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS (SessionEnv m h -> HasFS m h)
-> (TableEnv m h -> SessionEnv m h) -> TableEnv m h -> HasFS m h
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv
{-# INLINE tableHasBlockIO #-}
tableHasBlockIO :: TableEnv m h -> HasBlockIO m h
tableHasBlockIO :: forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO = SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO (SessionEnv m h -> HasBlockIO m h)
-> (TableEnv m h -> SessionEnv m h)
-> TableEnv m h
-> HasBlockIO m h
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv
{-# INLINE tableSessionUniqCounter #-}
tableSessionUniqCounter :: Table m h -> UniqCounter m
tableSessionUniqCounter :: forall (m :: * -> *) h. Table m h -> UniqCounter m
tableSessionUniqCounter = Session m h -> UniqCounter m
forall (m :: * -> *) h. Session m h -> UniqCounter m
sessionUniqCounter (Session m h -> UniqCounter m)
-> (Table m h -> Session m h) -> Table m h -> UniqCounter m
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Table m h -> Session m h
forall (m :: * -> *) h. Table m h -> Session m h
tableSession
{-# INLINE tableSessionUntrackTable #-}
{-# SPECIALISE tableSessionUntrackTable :: TableId -> TableEnv IO h -> IO () #-}
tableSessionUntrackTable :: MonadMVar m => TableId -> TableEnv m h -> m ()
tableSessionUntrackTable :: forall (m :: * -> *) h.
MonadMVar m =>
TableId -> TableEnv m h -> m ()
tableSessionUntrackTable TableId
tableId TableEnv m h
tEnv =
StrictMVar m (Map TableId (Table m h))
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables (TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv TableEnv m h
tEnv)) ((Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ())
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall a b. (a -> b) -> a -> b
$ Map TableId (Table m h) -> m (Map TableId (Table m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map TableId (Table m h) -> m (Map TableId (Table m h)))
-> (Map TableId (Table m h) -> Map TableId (Table m h))
-> Map TableId (Table m h)
-> m (Map TableId (Table m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableId -> Map TableId (Table m h) -> Map TableId (Table m h)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete TableId
tableId
data TableClosedError
= ErrTableClosed
deriving stock (Int -> TableClosedError -> ShowS
[TableClosedError] -> ShowS
TableClosedError -> String
(Int -> TableClosedError -> ShowS)
-> (TableClosedError -> String)
-> ([TableClosedError] -> ShowS)
-> Show TableClosedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TableClosedError -> ShowS
showsPrec :: Int -> TableClosedError -> ShowS
$cshow :: TableClosedError -> String
show :: TableClosedError -> String
$cshowList :: [TableClosedError] -> ShowS
showList :: [TableClosedError] -> ShowS
Show, TableClosedError -> TableClosedError -> Bool
(TableClosedError -> TableClosedError -> Bool)
-> (TableClosedError -> TableClosedError -> Bool)
-> Eq TableClosedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TableClosedError -> TableClosedError -> Bool
== :: TableClosedError -> TableClosedError -> Bool
$c/= :: TableClosedError -> TableClosedError -> Bool
/= :: TableClosedError -> TableClosedError -> Bool
Eq)
deriving anyclass (Show TableClosedError
Typeable TableClosedError
(Typeable TableClosedError, Show TableClosedError) =>
(TableClosedError -> SomeException)
-> (SomeException -> Maybe TableClosedError)
-> (TableClosedError -> String)
-> Exception TableClosedError
SomeException -> Maybe TableClosedError
TableClosedError -> String
TableClosedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: TableClosedError -> SomeException
toException :: TableClosedError -> SomeException
$cfromException :: SomeException -> Maybe TableClosedError
fromException :: SomeException -> Maybe TableClosedError
$cdisplayException :: TableClosedError -> String
displayException :: TableClosedError -> String
Exception)
{-# INLINE withKeepTableOpen #-}
{-# SPECIALISE withKeepTableOpen ::
Table IO h
-> (TableEnv IO h -> IO a)
-> IO a #-}
withKeepTableOpen ::
(MonadSTM m, MonadThrow m)
=> Table m h
-> (TableEnv m h -> m a)
-> m a
withKeepTableOpen :: forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withKeepTableOpen Table m h
t TableEnv m h -> m a
action = RWVar m (TableState m h) -> (TableState m h -> m a) -> m a
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (Table m h -> RWVar m (TableState m h)
forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState Table m h
t) ((TableState m h -> m a) -> m a) -> (TableState m h -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \case
TableState m h
TableClosed -> TableClosedError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TableClosedError
ErrTableClosed
TableOpen TableEnv m h
tEnv -> TableEnv m h -> m a
action TableEnv m h
tEnv
{-# SPECIALISE withTable ::
Session IO h
-> TableConfig
-> (Table IO h -> IO a)
-> IO a #-}
withTable ::
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
=> Session m h
-> TableConfig
-> (Table m h -> m a)
-> m a
withTable :: forall (m :: * -> *) h a.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> TableConfig -> (Table m h -> m a) -> m a
withTable Session m h
sesh TableConfig
conf = m (Table m h) -> (Table m h -> m ()) -> (Table m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (Session m h -> TableConfig -> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Session m h -> TableConfig -> m (Table m h)
new Session m h
sesh TableConfig
conf) Table m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Table m h -> m ()
close
{-# SPECIALISE new ::
Session IO h
-> TableConfig
-> IO (Table IO h) #-}
new ::
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m)
=> Session m h
-> TableConfig
-> m (Table m h)
new :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Session m h -> TableConfig -> m (Table m h)
new Session m h
sesh TableConfig
conf = do
TableId
tableId <- Unique -> TableId
uniqueToTableId (Unique -> TableId) -> m Unique -> m TableId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (Session m h -> UniqCounter m
forall (m :: * -> *) h. Session m h -> UniqCounter m
sessionUniqCounter Session m h
sesh)
let tr :: Tracer m TableTrace
tr = TableId -> TableTrace -> LSMTreeTrace
TraceTable TableId
tableId (TableTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m TableTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
lsmTreeTracer Session m h
sesh
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m TableTrace
tr (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ TableConfig -> TableTrace
TraceNewTable TableConfig
conf
Session m h -> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
sesh ((SessionEnv m h -> m (Table m h)) -> m (Table m h))
-> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv ->
(ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Table m h)) -> m (Table m h))
-> (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
ArenaManager (PrimState m)
am <- m (ArenaManager (PrimState m))
forall (m :: * -> *). PrimMonad m => m (ArenaManager (PrimState m))
newArenaManager
TableContent m h
tc <- UniqCounter m
-> SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m, MonadMVar m) =>
UniqCounter m
-> SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
newEmptyTableContent (Session m h -> UniqCounter m
forall (m :: * -> *) h. Session m h -> UniqCounter m
sessionUniqCounter Session m h
sesh) SessionEnv m h
seshEnv ActionRegistry m
reg
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf ArenaManager (PrimState m)
am Tracer m TableTrace
tr TableId
tableId TableContent m h
tc
{-# SPECIALISE newEmptyTableContent ::
UniqCounter IO
-> SessionEnv IO h
-> ActionRegistry IO
-> IO (TableContent IO h) #-}
newEmptyTableContent ::
(PrimMonad m, MonadMask m, MonadMVar m)
=> UniqCounter m
-> SessionEnv m h
-> ActionRegistry m
-> m (TableContent m h)
newEmptyTableContent :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m, MonadMVar m) =>
UniqCounter m
-> SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
newEmptyTableContent UniqCounter m
uc SessionEnv m h
seshEnv ActionRegistry m
reg = do
FsPath
blobpath <- SessionRoot -> Unique -> FsPath
Paths.tableBlobPath (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) (Unique -> FsPath) -> m Unique -> m FsPath
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter UniqCounter m
uc
let tableWriteBuffer :: WriteBuffer
tableWriteBuffer = WriteBuffer
WB.empty
Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
<- ActionRegistry m
-> m (Ref (WriteBufferBlobs m h))
-> (Ref (WriteBufferBlobs m h) -> m ())
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
WBB.new (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) FsPath
blobpath)
Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
let tableLevels :: Vector a
tableLevels = Vector a
forall a. Vector a
V.empty
LevelsCache m h
tableCache <- ActionRegistry m -> Levels m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
mkLevelsCache ActionRegistry m
reg Levels m h
forall a. Vector a
tableLevels
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent {
WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer
, Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
, Levels m h
forall a. Vector a
tableLevels :: forall a. Vector a
tableLevels :: Levels m h
tableLevels
, LevelsCache m h
tableCache :: LevelsCache m h
tableCache :: LevelsCache m h
tableCache
, tableUnionLevel :: UnionLevel m h
tableUnionLevel = UnionLevel m h
forall (m :: * -> *) h. UnionLevel m h
NoUnion
}
{-# SPECIALISE newWith ::
ActionRegistry IO
-> Session IO h
-> SessionEnv IO h
-> TableConfig
-> ArenaManager RealWorld
-> Tracer IO TableTrace
-> TableId
-> TableContent IO h
-> IO (Table IO h) #-}
newWith ::
(MonadSTM m, MonadMVar m, PrimMonad m)
=> ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> TableContent m h
-> m (Table m h)
newWith :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf !ArenaManager (PrimState m)
am !Tracer m TableTrace
tr !TableId
tableId !TableContent m h
tc = do
RWVar m (TableContent m h)
contentVar <- TableContent m h -> m (RWVar m (TableContent m h))
forall (m :: * -> *) a. MonadSTM m => a -> m (RWVar m a)
RW.new (TableContent m h -> m (RWVar m (TableContent m h)))
-> TableContent m h -> m (RWVar m (TableContent m h))
forall a b. (a -> b) -> a -> b
$ TableContent m h
tc
RWVar m (TableState m h)
tableVar <- TableState m h -> m (RWVar m (TableState m h))
forall (m :: * -> *) a. MonadSTM m => a -> m (RWVar m a)
RW.new (TableState m h -> m (RWVar m (TableState m h)))
-> TableState m h -> m (RWVar m (TableState m h))
forall a b. (a -> b) -> a -> b
$ TableEnv m h -> TableState m h
forall (m :: * -> *) h. TableEnv m h -> TableState m h
TableOpen (TableEnv m h -> TableState m h) -> TableEnv m h -> TableState m h
forall a b. (a -> b) -> a -> b
$ TableEnv {
tableSessionEnv :: SessionEnv m h
tableSessionEnv = SessionEnv m h
seshEnv
, tableContent :: RWVar m (TableContent m h)
tableContent = RWVar m (TableContent m h)
contentVar
}
let !t :: Table m h
t = TableConfig
-> RWVar m (TableState m h)
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> Session m h
-> Table m h
forall (m :: * -> *) h.
TableConfig
-> RWVar m (TableState m h)
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> Session m h
-> Table m h
Table TableConfig
conf RWVar m (TableState m h)
tableVar ArenaManager (PrimState m)
am Tracer m TableTrace
tr TableId
tableId Session m h
sesh
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
StrictMVar m (Map TableId (Table m h))
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables SessionEnv m h
seshEnv) ((Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ())
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall a b. (a -> b) -> a -> b
$
Map TableId (Table m h) -> m (Map TableId (Table m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map TableId (Table m h) -> m (Map TableId (Table m h)))
-> (Map TableId (Table m h) -> Map TableId (Table m h))
-> Map TableId (Table m h)
-> m (Map TableId (Table m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableId
-> Table m h -> Map TableId (Table m h) -> Map TableId (Table m h)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert TableId
tableId Table m h
t
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m TableTrace
tr (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SessionId -> TableConfig -> TableTrace
TraceCreatedTable (SessionEnv m h -> SessionId
forall (m :: * -> *) h. SessionEnv m h -> SessionId
sessionId SessionEnv m h
seshEnv) TableConfig
conf
Table m h -> m (Table m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Table m h -> m (Table m h)) -> Table m h -> m (Table m h)
forall a b. (a -> b) -> a -> b
$! Table m h
t
{-# SPECIALISE close :: Table IO h -> IO () #-}
close ::
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
=> Table m h
-> m ()
close :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Table m h -> m ()
close Table m h
t = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) TableTrace
TraceCloseTable
m (TableState m h)
-> (TableState m h -> m ())
-> (ActionRegistry m -> TableState m h -> m (TableState m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_
(RWVar m (TableState m h) -> m (TableState m h)
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> m a
RW.unsafeAcquireWriteAccess (Table m h -> RWVar m (TableState m h)
forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState Table m h
t))
(STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (TableState m h -> STM m ()) -> TableState m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RWVar m (TableState m h) -> TableState m h -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> a -> STM m ()
RW.unsafeReleaseWriteAccess (Table m h -> RWVar m (TableState m h)
forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState Table m h
t)) ((ActionRegistry m -> TableState m h -> m (TableState m h))
-> m ())
-> (ActionRegistry m -> TableState m h -> m (TableState m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> \case
TableState m h
TableClosed -> TableState m h -> m (TableState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableState m h
forall (m :: * -> *) h. TableState m h
TableClosed
TableOpen TableEnv m h
tEnv -> do
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (TableId -> TableEnv m h -> m ()
forall (m :: * -> *) h.
MonadMVar m =>
TableId -> TableEnv m h -> m ()
tableSessionUntrackTable (Table m h -> TableId
forall (m :: * -> *) h. Table m h -> TableId
tableId Table m h
t) TableEnv m h
tEnv)
RWVar m (TableContent m h)
-> (TableContent m h -> m (TableContent m h)) -> m ()
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> (a -> m a) -> m ()
RW.withWriteAccess_ (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m (TableContent m h)) -> m ())
-> (TableContent m h -> m (TableContent m h)) -> m ()
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tc -> do
ActionRegistry m -> TableContent m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m ()
releaseTableContent ActionRegistry m
reg TableContent m h
tc
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent m h
tc
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) TableTrace
TraceClosedTable
TableState m h -> m (TableState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableState m h
forall (m :: * -> *) h. TableState m h
TableClosed
{-# SPECIALISE lookups ::
ResolveSerialisedValue
-> V.Vector SerialisedKey
-> Table IO h
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-}
lookups ::
(MonadAsync m, MonadMask m, MonadMVar m, MonadST m)
=> ResolveSerialisedValue
-> V.Vector SerialisedKey
-> Table m h
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookups :: forall (m :: * -> *) h.
(MonadAsync m, MonadMask m, MonadMVar m, MonadST m) =>
ResolveSerialisedValue
-> Vector SerialisedKey
-> Table m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookups ResolveSerialisedValue
resolve Vector SerialisedKey
ks Table m h
t = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> TableTrace
TraceLookups (Vector SerialisedKey -> Int
forall a. Vector a -> Int
V.length Vector SerialisedKey
ks)
Table m h
-> (TableEnv m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withKeepTableOpen Table m h
t ((TableEnv m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> (TableEnv m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv ->
RWVar m (TableContent m h)
-> (TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> (TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tc -> do
case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tc of
UnionLevel m h
NoUnion -> TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsRegular TableEnv m h
tEnv TableContent m h
tc
Union Ref (MergingTree m h)
tree UnionCache m h
unionCache -> do
Ref (MergingTree m h) -> m Bool
forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
isStructurallyEmpty Ref (MergingTree m h)
tree m Bool
-> (Bool
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True -> TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsRegular TableEnv m h
tEnv TableContent m h
tc
Bool
False -> if WriteBuffer -> Bool
WB.null (TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tc) Bool -> Bool -> Bool
&& Vector (Level m h) -> Bool
forall a. Vector a -> Bool
V.null (TableContent m h -> Vector (Level m h)
forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels TableContent m h
tc)
then TableEnv m h
-> UnionCache m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsUnion TableEnv m h
tEnv UnionCache m h
unionCache
else TableEnv m h
-> TableContent m h
-> UnionCache m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsRegularAndUnion TableEnv m h
tEnv TableContent m h
tc UnionCache m h
unionCache
where
lookupsRegular :: TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsRegular TableEnv m h
tEnv TableContent m h
tc = do
let !cache :: LevelsCache m h
cache = TableContent m h -> LevelsCache m h
forall (m :: * -> *) h. TableContent m h -> LevelsCache m h
tableCache TableContent m h
tc
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> Salt
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h.
(MonadThrow m, MonadST m) =>
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> Salt
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (LookupAcc m h)
lookupsIOWithWriteBuffer
(TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
(Table m h -> ArenaManager (PrimState m)
forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager Table m h
t)
ResolveSerialisedValue
resolve
(TableEnv m h -> Salt
forall (m :: * -> *) h. TableEnv m h -> Salt
tableSessionSalt TableEnv m h
tEnv)
(TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tc)
(TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
tc)
(LevelsCache m h -> Vector (Ref (Run m h))
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns LevelsCache m h
cache)
(LevelsCache m h -> Vector (Bloom SerialisedKey)
forall (m :: * -> *) h.
LevelsCache m h -> Vector (Bloom SerialisedKey)
cachedFilters LevelsCache m h
cache)
(LevelsCache m h -> Vector Index
forall (m :: * -> *) h. LevelsCache m h -> Vector Index
cachedIndexes LevelsCache m h
cache)
(LevelsCache m h -> Vector (Handle h)
forall (m :: * -> *) h. LevelsCache m h -> Vector (Handle h)
cachedKOpsFiles LevelsCache m h
cache)
Vector SerialisedKey
ks
lookupsUnion :: TableEnv m h
-> UnionCache m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsUnion TableEnv m h
tEnv UnionCache m h
unionCache = do
LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
treeResults <- ((Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree (Vector (Ref (Run m h)))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))))
-> LookupTree (Vector (Ref (Run m h)))
-> (Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree (Vector (Ref (Run m h)))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> LookupTree a -> m (LookupTree b)
MT.mapMStrict (UnionCache m h -> LookupTree (Vector (Ref (Run m h)))
forall (m :: * -> *) h.
UnionCache m h -> LookupTree (Vector (Ref (Run m h)))
cachedTree UnionCache m h
unionCache) ((Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))))
-> (Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall a b. (a -> b) -> a -> b
$ \Vector (Ref (Run m h))
runs ->
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. m a -> m (Async m a)
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
Async.async (m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a b. (a -> b) -> a -> b
$ TableEnv m h
-> Vector (Ref (Run m h))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsUnionSingleBatch TableEnv m h
tEnv Vector (Ref (Run m h))
runs
ResolveSerialisedValue
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h.
MonadAsync m =>
ResolveSerialisedValue
-> LookupTree (Async m (LookupAcc m h)) -> m (LookupAcc m h)
MT.foldLookupTree ResolveSerialisedValue
resolve LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
treeResults
lookupsRegularAndUnion :: TableEnv m h
-> TableContent m h
-> UnionCache m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsRegularAndUnion TableEnv m h
tEnv TableContent m h
tc UnionCache m h
unionCache = do
Async m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularResult <- m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. m a -> m (Async m a)
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
Async.async (m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a b. (a -> b) -> a -> b
$ TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsRegular TableEnv m h
tEnv TableContent m h
tc
LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
treeResults <- ((Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree (Vector (Ref (Run m h)))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))))
-> LookupTree (Vector (Ref (Run m h)))
-> (Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree (Vector (Ref (Run m h)))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> LookupTree a -> m (LookupTree b)
MT.mapMStrict (UnionCache m h -> LookupTree (Vector (Ref (Run m h)))
forall (m :: * -> *) h.
UnionCache m h -> LookupTree (Vector (Ref (Run m h)))
cachedTree UnionCache m h
unionCache) ((Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))))
-> (Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall a b. (a -> b) -> a -> b
$ \Vector (Ref (Run m h))
runs ->
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. m a -> m (Async m a)
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
Async.async (m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a b. (a -> b) -> a -> b
$ TableEnv m h
-> Vector (Ref (Run m h))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsUnionSingleBatch TableEnv m h
tEnv Vector (Ref (Run m h))
runs
ResolveSerialisedValue
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h.
MonadAsync m =>
ResolveSerialisedValue
-> LookupTree (Async m (LookupAcc m h)) -> m (LookupAcc m h)
MT.foldLookupTree ResolveSerialisedValue
resolve (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$
TreeMergeType
-> Vector
(LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. TreeMergeType -> Vector (LookupTree a) -> LookupTree a
MT.mkLookupNode TreeMergeType
MR.MergeLevel (Vector
(LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> Vector
(LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a b. (a -> b) -> a -> b
$ [LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))]
-> Vector
(LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall a. [a] -> Vector a
V.fromList
[ Async m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. a -> LookupTree a
MT.LookupBatch Async m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularResult
, LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
treeResults
]
lookupsUnionSingleBatch :: TableEnv m h
-> Vector (Ref (Run m h))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookupsUnionSingleBatch TableEnv m h
tEnv Vector (Ref (Run m h))
runs =
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> Salt
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h.
(MonadThrow m, MonadST m) =>
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> Salt
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (LookupAcc m h)
lookupsIO
(TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
(Table m h -> ArenaManager (PrimState m)
forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager Table m h
t)
ResolveSerialisedValue
resolve
(TableEnv m h -> Salt
forall (m :: * -> *) h. TableEnv m h -> Salt
tableSessionSalt TableEnv m h
tEnv)
Vector (Ref (Run m h))
runs
((Ref (Run m h) -> Bloom SerialisedKey)
-> Vector (Ref (Run m h)) -> Vector (Bloom SerialisedKey)
forall a b. (a -> b) -> Vector a -> Vector b
V.mapStrict (\(DeRef Run m h
r) -> Run m h -> Bloom SerialisedKey
forall (m :: * -> *) h. Run m h -> Bloom SerialisedKey
Run.runFilter Run m h
r) Vector (Ref (Run m h))
runs)
((Ref (Run m h) -> Index) -> Vector (Ref (Run m h)) -> Vector Index
forall a b. (a -> b) -> Vector a -> Vector b
V.mapStrict (\(DeRef Run m h
r) -> Run m h -> Index
forall (m :: * -> *) h. Run m h -> Index
Run.runIndex Run m h
r) Vector (Ref (Run m h))
runs)
((Ref (Run m h) -> Handle h)
-> Vector (Ref (Run m h)) -> Vector (Handle h)
forall a b. (a -> b) -> Vector a -> Vector b
V.mapStrict (\(DeRef Run m h
r) -> Run m h -> Handle h
forall (m :: * -> *) h. Run m h -> Handle h
Run.runKOpsFile Run m h
r) Vector (Ref (Run m h))
runs)
Vector SerialisedKey
ks
{-# SPECIALISE rangeLookup ::
ResolveSerialisedValue
-> Range SerialisedKey
-> Table IO h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
-> IO (V.Vector res) #-}
rangeLookup ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> Range SerialisedKey
-> Table m h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (V.Vector res)
rangeLookup :: forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> Range SerialisedKey
-> Table m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
rangeLookup ResolveSerialisedValue
resolve Range SerialisedKey
range Table m h
t SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Range SerialisedKey -> TableTrace
TraceRangeLookup Range SerialisedKey
range
case Range SerialisedKey
range of
FromToExcluding SerialisedKey
lb SerialisedKey
ub ->
ResolveSerialisedValue
-> OffsetKey
-> Table m h
-> (Cursor m h -> m (Vector res))
-> m (Vector res)
forall (m :: * -> *) h a.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> OffsetKey -> Table m h -> (Cursor m h -> m a) -> m a
withCursor ResolveSerialisedValue
resolve (SerialisedKey -> OffsetKey
OffsetKey SerialisedKey
lb) Table m h
t ((Cursor m h -> m (Vector res)) -> m (Vector res))
-> (Cursor m h -> m (Vector res)) -> m (Vector res)
forall a b. (a -> b) -> a -> b
$ \Cursor m h
cursor ->
Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor (SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
< SerialisedKey
ub) []
FromToIncluding SerialisedKey
lb SerialisedKey
ub ->
ResolveSerialisedValue
-> OffsetKey
-> Table m h
-> (Cursor m h -> m (Vector res))
-> m (Vector res)
forall (m :: * -> *) h a.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> OffsetKey -> Table m h -> (Cursor m h -> m a) -> m a
withCursor ResolveSerialisedValue
resolve (SerialisedKey -> OffsetKey
OffsetKey SerialisedKey
lb) Table m h
t ((Cursor m h -> m (Vector res)) -> m (Vector res))
-> (Cursor m h -> m (Vector res)) -> m (Vector res)
forall a b. (a -> b) -> a -> b
$ \Cursor m h
cursor ->
Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor (SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
<= SerialisedKey
ub) []
where
chunkSize :: Int
chunkSize = Int
500
go :: Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor SerialisedKey -> Bool
isInUpperBound ![Vector res]
chunks = do
Vector res
chunk <- ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursorWhile ResolveSerialisedValue
resolve SerialisedKey -> Bool
isInUpperBound Int
chunkSize Cursor m h
cursor SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry
let !n :: Int
n = Vector res -> Int
forall a. Vector a -> Int
V.length Vector res
chunk
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
chunkSize
then Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor SerialisedKey -> Bool
isInUpperBound (Vector res
chunk Vector res -> [Vector res] -> [Vector res]
forall a. a -> [a] -> [a]
: [Vector res]
chunks)
else Vector res -> m (Vector res)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Vector res] -> Vector res
forall a. [Vector a] -> Vector a
V.concat ([Vector res] -> [Vector res]
forall a. [a] -> [a]
reverse (Int -> Int -> Vector res -> Vector res
forall a. Int -> Int -> Vector a -> Vector a
V.slice Int
0 Int
n Vector res
chunk Vector res -> [Vector res] -> [Vector res]
forall a. a -> [a] -> [a]
: [Vector res]
chunks)))
{-# SPECIALISE updates ::
ResolveSerialisedValue
-> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Table IO h
-> IO () #-}
updates ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Table m h
-> m ()
updates :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Table m h
-> m ()
updates ResolveSerialisedValue
resolve Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es Table m h
t = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> TableTrace
TraceUpdates (Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> Int
forall a. Vector a -> Int
V.length Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es)
let conf :: TableConfig
conf = Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig Table m h
t
Table m h -> (TableEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withKeepTableOpen Table m h
t ((TableEnv m h -> m ()) -> m ()) -> (TableEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
let hfs :: HasFS m h
hfs = TableEnv m h -> HasFS m h
forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS TableEnv m h
tEnv
m (TableContent m h)
-> (TableContent m h -> m ())
-> (ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_
(RWVar m (TableContent m h) -> m (TableContent m h)
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> m a
RW.unsafeAcquireWriteAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv))
(STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (TableContent m h -> STM m ()) -> TableContent m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RWVar m (TableContent m h) -> TableContent m h -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> a -> STM m ()
RW.unsafeReleaseWriteAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv)) ((ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ())
-> (ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg TableContent m h
tc -> do
TableContent m h
tc' <-
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> Salt
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> Salt
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
updatesWithInterleavedFlushes
(Tracer m TableTrace -> Tracer m (AtLevel MergeTrace)
forall (m :: * -> *).
Monad m =>
Tracer m TableTrace -> Tracer m (AtLevel MergeTrace)
contramapTraceMerge (Tracer m TableTrace -> Tracer m (AtLevel MergeTrace))
-> Tracer m TableTrace -> Tracer m (AtLevel MergeTrace)
forall a b. (a -> b) -> a -> b
$ Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t)
TableConfig
conf
ResolveSerialisedValue
resolve
HasFS m h
hfs
(TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
(TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv)
(TableEnv m h -> Salt
forall (m :: * -> *) h. TableEnv m h -> Salt
tableSessionSalt TableEnv m h
tEnv)
(Table m h -> UniqCounter m
forall (m :: * -> *) h. Table m h -> UniqCounter m
tableSessionUniqCounter Table m h
t)
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es
ActionRegistry m
reg
TableContent m h
tc
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> TableTrace
TraceUpdated (Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> Int
forall a. Vector a -> Int
V.length Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es)
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent m h
tc'
data BlobRefInvalidError
=
ErrBlobRefInvalid !Int
deriving stock (Int -> BlobRefInvalidError -> ShowS
[BlobRefInvalidError] -> ShowS
BlobRefInvalidError -> String
(Int -> BlobRefInvalidError -> ShowS)
-> (BlobRefInvalidError -> String)
-> ([BlobRefInvalidError] -> ShowS)
-> Show BlobRefInvalidError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BlobRefInvalidError -> ShowS
showsPrec :: Int -> BlobRefInvalidError -> ShowS
$cshow :: BlobRefInvalidError -> String
show :: BlobRefInvalidError -> String
$cshowList :: [BlobRefInvalidError] -> ShowS
showList :: [BlobRefInvalidError] -> ShowS
Show, BlobRefInvalidError -> BlobRefInvalidError -> Bool
(BlobRefInvalidError -> BlobRefInvalidError -> Bool)
-> (BlobRefInvalidError -> BlobRefInvalidError -> Bool)
-> Eq BlobRefInvalidError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
== :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
$c/= :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
/= :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
Eq)
deriving anyclass (Show BlobRefInvalidError
Typeable BlobRefInvalidError
(Typeable BlobRefInvalidError, Show BlobRefInvalidError) =>
(BlobRefInvalidError -> SomeException)
-> (SomeException -> Maybe BlobRefInvalidError)
-> (BlobRefInvalidError -> String)
-> Exception BlobRefInvalidError
SomeException -> Maybe BlobRefInvalidError
BlobRefInvalidError -> String
BlobRefInvalidError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: BlobRefInvalidError -> SomeException
toException :: BlobRefInvalidError -> SomeException
$cfromException :: SomeException -> Maybe BlobRefInvalidError
fromException :: SomeException -> Maybe BlobRefInvalidError
$cdisplayException :: BlobRefInvalidError -> String
displayException :: BlobRefInvalidError -> String
Exception)
{-# SPECIALISE retrieveBlobs ::
Session IO h
-> V.Vector (WeakBlobRef IO h)
-> IO (V.Vector SerialisedBlob) #-}
retrieveBlobs ::
(MonadMask m, MonadST m, MonadSTM m)
=> Session m h
-> V.Vector (WeakBlobRef m h)
-> m (V.Vector SerialisedBlob)
retrieveBlobs :: forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
Session m h
-> Vector (WeakBlobRef m h) -> m (Vector SerialisedBlob)
retrieveBlobs Session m h
sesh Vector (WeakBlobRef m h)
wrefs = do
Tracer m SessionTrace -> SessionTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m SessionTrace
forall (m :: * -> *) h. Session m h -> Tracer m SessionTrace
sessionTracer Session m h
sesh) (SessionTrace -> m ()) -> SessionTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> SessionTrace
TraceRetrieveBlobs (Vector (WeakBlobRef m h) -> Int
forall a. Vector a -> Int
V.length Vector (WeakBlobRef m h)
wrefs)
Session m h
-> (SessionEnv m h -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
sesh ((SessionEnv m h -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob))
-> (SessionEnv m h -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv ->
let hbio :: HasBlockIO m h
hbio = SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO SessionEnv m h
seshEnv in
(WeakBlobRefInvalid -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob) -> m (Vector SerialisedBlob)
forall e a. Exception e => (e -> m a) -> m a -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (\(BlobRef.WeakBlobRefInvalid Int
i) ->
BlobRefInvalidError -> m (Vector SerialisedBlob)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (Int -> BlobRefInvalidError
ErrBlobRefInvalid Int
i)) (m (Vector SerialisedBlob) -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob) -> m (Vector SerialisedBlob)
forall a b. (a -> b) -> a -> b
$
HasBlockIO m h
-> Vector (WeakBlobRef m h) -> m (Vector SerialisedBlob)
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
HasBlockIO m h
-> Vector (WeakBlobRef m h) -> m (Vector SerialisedBlob)
BlobRef.readWeakBlobRefs HasBlockIO m h
hbio Vector (WeakBlobRef m h)
wrefs
data Cursor m h = Cursor {
forall (m :: * -> *) h.
Cursor m h -> StrictMVar m (CursorState m h)
cursorState :: !(StrictMVar m (CursorState m h))
, forall (m :: * -> *) h. Cursor m h -> Tracer m CursorTrace
cursorTracer :: !(Tracer m CursorTrace)
}
instance NFData (Cursor m h) where
rnf :: Cursor m h -> ()
rnf (Cursor StrictMVar m (CursorState m h)
a Tracer m CursorTrace
b) = StrictMVar m (CursorState m h) -> ()
forall a. a -> ()
rwhnf StrictMVar m (CursorState m h)
a () -> () -> ()
forall a b. a -> b -> b
`seq` Tracer m CursorTrace -> ()
forall a. a -> ()
rwhnf Tracer m CursorTrace
b
data CursorState m h =
CursorOpen !(CursorEnv m h)
| CursorClosed
data CursorEnv m h = CursorEnv {
forall (m :: * -> *) h. CursorEnv m h -> Session m h
cursorSession :: !(Session m h)
, forall (m :: * -> *) h. CursorEnv m h -> SessionEnv m h
cursorSessionEnv :: !(SessionEnv m h)
, forall (m :: * -> *) h. CursorEnv m h -> CursorId
cursorId :: !CursorId
, forall (m :: * -> *) h. CursorEnv m h -> Maybe (Readers m h)
cursorReaders :: !(Maybe (Readers.Readers m h))
, forall (m :: * -> *) h. CursorEnv m h -> Ref (WriteBufferBlobs m h)
cursorWBB :: !(Ref (WBB.WriteBufferBlobs m h))
, forall (m :: * -> *) h. CursorEnv m h -> Vector (Ref (Run m h))
cursorRuns :: !(V.Vector (Ref (Run m h)))
, forall (m :: * -> *) h. CursorEnv m h -> Maybe (UnionCache m h)
cursorUnion :: !(Maybe (UnionCache m h))
}
{-# SPECIALISE withCursor ::
ResolveSerialisedValue
-> OffsetKey
-> Table IO h
-> (Cursor IO h -> IO a)
-> IO a #-}
withCursor ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> OffsetKey
-> Table m h
-> (Cursor m h -> m a)
-> m a
withCursor :: forall (m :: * -> *) h a.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> OffsetKey -> Table m h -> (Cursor m h -> m a) -> m a
withCursor ResolveSerialisedValue
resolve OffsetKey
offsetKey Table m h
t = m (Cursor m h)
-> (Cursor m h -> m ()) -> (Cursor m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (ResolveSerialisedValue -> OffsetKey -> Table m h -> m (Cursor m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue -> OffsetKey -> Table m h -> m (Cursor m h)
newCursor ResolveSerialisedValue
resolve OffsetKey
offsetKey Table m h
t) Cursor m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) =>
Cursor m h -> m ()
closeCursor
{-# SPECIALISE newCursor ::
ResolveSerialisedValue
-> OffsetKey
-> Table IO h
-> IO (Cursor IO h) #-}
newCursor ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> OffsetKey
-> Table m h
-> m (Cursor m h)
newCursor :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue -> OffsetKey -> Table m h -> m (Cursor m h)
newCursor !ResolveSerialisedValue
resolve !OffsetKey
offsetKey Table m h
t = Table m h -> (TableEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withKeepTableOpen Table m h
t ((TableEnv m h -> m (Cursor m h)) -> m (Cursor m h))
-> (TableEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
let cursorSession :: Session m h
cursorSession = Table m h -> Session m h
forall (m :: * -> *) h. Table m h -> Session m h
tableSession Table m h
t
let cursorSessionEnv :: SessionEnv m h
cursorSessionEnv = TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv TableEnv m h
tEnv
CursorId
cursorId <- Unique -> CursorId
uniqueToCursorId (Unique -> CursorId) -> m Unique -> m CursorId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (Table m h -> UniqCounter m
forall (m :: * -> *) h. Table m h -> UniqCounter m
tableSessionUniqCounter Table m h
t)
let cursorTracer :: Tracer m CursorTrace
cursorTracer = CursorId -> CursorTrace -> LSMTreeTrace
TraceCursor CursorId
cursorId (CursorTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m CursorTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
lsmTreeTracer Session m h
cursorSession
Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ TableId -> OffsetKey -> CursorTrace
TraceNewCursor (Table m h -> TableId
forall (m :: * -> *) h. Table m h -> TableId
tableId Table m h
t) OffsetKey
offsetKey
Session m h -> (SessionEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
cursorSession ((SessionEnv m h -> m (Cursor m h)) -> m (Cursor m h))
-> (SessionEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
_ -> do
(ActionRegistry m -> m (Cursor m h)) -> m (Cursor m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Cursor m h)) -> m (Cursor m h))
-> (ActionRegistry m -> m (Cursor m h)) -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
(WriteBuffer
wb, Ref (WriteBufferBlobs m h)
wbblobs, Vector (Ref (Run m h))
cursorRuns, Maybe (UnionCache m h)
cursorUnion) <-
ActionRegistry m
-> RWVar m (TableContent m h)
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)), Maybe (UnionCache m h))
forall {m :: * -> *} {h}.
(MonadSTM m, PrimMonad m, MonadMask m) =>
ActionRegistry m
-> RWVar m (TableContent m h)
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)), Maybe (UnionCache m h))
dupTableContent ActionRegistry m
reg (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv)
let cursorSources :: [ReaderSource m h]
cursorSources =
WriteBuffer -> Ref (WriteBufferBlobs m h) -> ReaderSource m h
forall (m :: * -> *) h.
WriteBuffer -> Ref (WriteBufferBlobs m h) -> ReaderSource m h
Readers.FromWriteBuffer WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbblobs
ReaderSource m h -> [ReaderSource m h] -> [ReaderSource m h]
forall a. a -> [a] -> [a]
: (Ref (Run m h) -> ReaderSource m h)
-> [Ref (Run m h)] -> [ReaderSource m h]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Ref (Run m h) -> ReaderSource m h
forall (m :: * -> *) h. Ref (Run m h) -> ReaderSource m h
Readers.FromRun (Vector (Ref (Run m h)) -> [Ref (Run m h)]
forall a. Vector a -> [a]
V.toList Vector (Ref (Run m h))
cursorRuns)
[ReaderSource m h] -> [ReaderSource m h] -> [ReaderSource m h]
forall a. Semigroup a => a -> a -> a
<> case Maybe (UnionCache m h)
cursorUnion of
Maybe (UnionCache m h)
Nothing -> []
Just (UnionCache LookupTree (Vector (Ref (Run m h)))
treeCache) ->
[LookupTree (Vector (Ref (Run m h))) -> ReaderSource m h
forall (m :: * -> *) h.
LookupTree (Vector (Ref (Run m h))) -> ReaderSource m h
lookupTreeToReaderSource LookupTree (Vector (Ref (Run m h)))
treeCache]
Maybe (Readers m h)
cursorReaders <-
ActionRegistry m
-> m (Maybe (Readers m h))
-> (Readers m h -> m ())
-> m (Maybe (Readers m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m (Maybe a) -> (a -> m ()) -> m (Maybe a)
withRollbackMaybe ActionRegistry m
reg
(ResolveSerialisedValue
-> OffsetKey -> [ReaderSource m h] -> m (Maybe (Readers m h))
forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> OffsetKey -> [ReaderSource m h] -> m (Maybe (Readers m h))
Readers.new ResolveSerialisedValue
resolve OffsetKey
offsetKey [ReaderSource m h]
cursorSources)
Readers m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
Readers.close
let cursorWBB :: Ref (WriteBufferBlobs m h)
cursorWBB = Ref (WriteBufferBlobs m h)
wbblobs
StrictMVar m (CursorState m h)
cursorState <- CursorState m h -> m (StrictMVar m (CursorState m h))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar (CursorEnv m h -> CursorState m h
forall (m :: * -> *) h. CursorEnv m h -> CursorState m h
CursorOpen CursorEnv {Maybe (Readers m h)
Maybe (UnionCache m h)
Ref (WriteBufferBlobs m h)
Vector (Ref (Run m h))
CursorId
SessionEnv m h
Session m h
cursorSessionEnv :: SessionEnv m h
cursorSession :: Session m h
cursorId :: CursorId
cursorWBB :: Ref (WriteBufferBlobs m h)
cursorRuns :: Vector (Ref (Run m h))
cursorReaders :: Maybe (Readers m h)
cursorUnion :: Maybe (UnionCache m h)
cursorSession :: Session m h
cursorSessionEnv :: SessionEnv m h
cursorId :: CursorId
cursorRuns :: Vector (Ref (Run m h))
cursorUnion :: Maybe (UnionCache m h)
cursorReaders :: Maybe (Readers m h)
cursorWBB :: Ref (WriteBufferBlobs m h)
..})
let !cursor :: Cursor m h
cursor = Cursor {StrictMVar m (CursorState m h)
cursorState :: StrictMVar m (CursorState m h)
cursorState :: StrictMVar m (CursorState m h)
cursorState, Tracer m CursorTrace
cursorTracer :: Tracer m CursorTrace
cursorTracer :: Tracer m CursorTrace
cursorTracer}
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
StrictMVar m (Map CursorId (Cursor m h))
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
cursorSessionEnv) ((Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ())
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall a b. (a -> b) -> a -> b
$
Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> (Map CursorId (Cursor m h) -> Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h)
-> m (Map CursorId (Cursor m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CursorId
-> Cursor m h
-> Map CursorId (Cursor m h)
-> Map CursorId (Cursor m h)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert CursorId
cursorId Cursor m h
cursor
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SessionId -> CursorTrace
TraceCreatedCursor (TableEnv m h -> SessionId
forall (m :: * -> *) h. TableEnv m h -> SessionId
tableSessionId TableEnv m h
tEnv)
Cursor m h -> m (Cursor m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cursor m h -> m (Cursor m h)) -> Cursor m h -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$! Cursor m h
cursor
where
dupTableContent :: ActionRegistry m
-> RWVar m (TableContent m h)
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)), Maybe (UnionCache m h))
dupTableContent ActionRegistry m
reg RWVar m (TableContent m h)
contentVar = do
RWVar m (TableContent m h)
-> (TableContent m h
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)), Maybe (UnionCache m h)))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)), Maybe (UnionCache m h))
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess RWVar m (TableContent m h)
contentVar ((TableContent m h
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)), Maybe (UnionCache m h)))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)), Maybe (UnionCache m h)))
-> (TableContent m h
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)), Maybe (UnionCache m h)))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)), Maybe (UnionCache m h))
forall a b. (a -> b) -> a -> b
$ \TableContent m h
content -> do
let !wb :: WriteBuffer
wb = TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
content
!wbblobs :: Ref (WriteBufferBlobs m h)
wbblobs = TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
content
Ref (WriteBufferBlobs m h)
wbblobs' <- ActionRegistry m
-> m (Ref (WriteBufferBlobs m h))
-> (Ref (WriteBufferBlobs m h) -> m ())
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (WriteBufferBlobs m h)
wbblobs) Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
let runs :: Vector (Ref (Run m h))
runs = LevelsCache m h -> Vector (Ref (Run m h))
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns (TableContent m h -> LevelsCache m h
forall (m :: * -> *) h. TableContent m h -> LevelsCache m h
tableCache TableContent m h
content)
Vector (Ref (Run m h))
runs' <- Vector (Ref (Run m h))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
V.forM Vector (Ref (Run m h))
runs ((Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h))))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->
ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
Maybe (UnionCache m h)
unionCache <- case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
content of
UnionLevel m h
NoUnion -> Maybe (UnionCache m h) -> m (Maybe (UnionCache m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (UnionCache m h)
forall a. Maybe a
Nothing
Union Ref (MergingTree m h)
_ UnionCache m h
c -> UnionCache m h -> Maybe (UnionCache m h)
forall a. a -> Maybe a
Just (UnionCache m h -> Maybe (UnionCache m h))
-> m (UnionCache m h) -> m (Maybe (UnionCache m h))
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> ActionRegistry m -> UnionCache m h -> m (UnionCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionCache m h -> m (UnionCache m h)
duplicateUnionCache ActionRegistry m
reg UnionCache m h
c
(WriteBuffer, Ref (WriteBufferBlobs m h), Vector (Ref (Run m h)),
Maybe (UnionCache m h))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)), Maybe (UnionCache m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WriteBuffer
wb, Ref (WriteBufferBlobs m h)
wbblobs', Vector (Ref (Run m h))
runs', Maybe (UnionCache m h)
unionCache)
lookupTreeToReaderSource ::
MT.LookupTree (V.Vector (Ref (Run m h))) -> Readers.ReaderSource m h
lookupTreeToReaderSource :: forall (m :: * -> *) h.
LookupTree (Vector (Ref (Run m h))) -> ReaderSource m h
lookupTreeToReaderSource = \case
MT.LookupBatch Vector (Ref (Run m h))
v ->
case (Ref (Run m h) -> ReaderSource m h)
-> [Ref (Run m h)] -> [ReaderSource m h]
forall a b. (a -> b) -> [a] -> [b]
map Ref (Run m h) -> ReaderSource m h
forall (m :: * -> *) h. Ref (Run m h) -> ReaderSource m h
Readers.FromRun (Vector (Ref (Run m h)) -> [Ref (Run m h)]
forall a. Vector a -> [a]
V.toList Vector (Ref (Run m h))
v) of
[ReaderSource m h
src] -> ReaderSource m h
src
[ReaderSource m h]
srcs -> ReadersMergeType -> [ReaderSource m h] -> ReaderSource m h
forall (m :: * -> *) h.
ReadersMergeType -> [ReaderSource m h] -> ReaderSource m h
Readers.FromReaders ReadersMergeType
Readers.MergeLevel [ReaderSource m h]
srcs
MT.LookupNode TreeMergeType
ty Vector (LookupTree (Vector (Ref (Run m h))))
children ->
ReadersMergeType -> [ReaderSource m h] -> ReaderSource m h
forall (m :: * -> *) h.
ReadersMergeType -> [ReaderSource m h] -> ReaderSource m h
Readers.FromReaders
(TreeMergeType -> ReadersMergeType
convertMergeType TreeMergeType
ty)
((LookupTree (Vector (Ref (Run m h))) -> ReaderSource m h)
-> [LookupTree (Vector (Ref (Run m h)))] -> [ReaderSource m h]
forall a b. (a -> b) -> [a] -> [b]
map LookupTree (Vector (Ref (Run m h))) -> ReaderSource m h
forall (m :: * -> *) h.
LookupTree (Vector (Ref (Run m h))) -> ReaderSource m h
lookupTreeToReaderSource (Vector (LookupTree (Vector (Ref (Run m h))))
-> [LookupTree (Vector (Ref (Run m h)))]
forall a. Vector a -> [a]
V.toList Vector (LookupTree (Vector (Ref (Run m h))))
children))
where
convertMergeType :: TreeMergeType -> ReadersMergeType
convertMergeType = \case
TreeMergeType
MR.MergeUnion -> ReadersMergeType
Readers.MergeUnion
TreeMergeType
MR.MergeLevel -> ReadersMergeType
Readers.MergeLevel
{-# SPECIALISE closeCursor :: Cursor IO h -> IO () #-}
closeCursor ::
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m)
=> Cursor m h
-> m ()
closeCursor :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) =>
Cursor m h -> m ()
closeCursor Cursor {Tracer m CursorTrace
StrictMVar m (CursorState m h)
cursorState :: forall (m :: * -> *) h.
Cursor m h -> StrictMVar m (CursorState m h)
cursorTracer :: forall (m :: * -> *) h. Cursor m h -> Tracer m CursorTrace
cursorState :: StrictMVar m (CursorState m h)
cursorTracer :: Tracer m CursorTrace
..} = do
Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ CursorTrace
TraceCloseCursor
m (CursorState m h)
-> (CursorState m h -> m ())
-> (ActionRegistry m -> CursorState m h -> m (CursorState m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_ (StrictMVar m (CursorState m h) -> m (CursorState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
takeMVar StrictMVar m (CursorState m h)
cursorState) (StrictMVar m (CursorState m h) -> CursorState m h -> m ()
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m ()
putMVar StrictMVar m (CursorState m h)
cursorState) ((ActionRegistry m -> CursorState m h -> m (CursorState m h))
-> m ())
-> (ActionRegistry m -> CursorState m h -> m (CursorState m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> \case
CursorState m h
CursorClosed -> CursorState m h -> m (CursorState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure CursorState m h
forall (m :: * -> *) h. CursorState m h
CursorClosed
CursorOpen CursorEnv {Maybe (Readers m h)
Maybe (UnionCache m h)
Ref (WriteBufferBlobs m h)
Vector (Ref (Run m h))
CursorId
SessionEnv m h
Session m h
cursorSessionEnv :: forall (m :: * -> *) h. CursorEnv m h -> SessionEnv m h
cursorSession :: forall (m :: * -> *) h. CursorEnv m h -> Session m h
cursorId :: forall (m :: * -> *) h. CursorEnv m h -> CursorId
cursorWBB :: forall (m :: * -> *) h. CursorEnv m h -> Ref (WriteBufferBlobs m h)
cursorRuns :: forall (m :: * -> *) h. CursorEnv m h -> Vector (Ref (Run m h))
cursorReaders :: forall (m :: * -> *) h. CursorEnv m h -> Maybe (Readers m h)
cursorUnion :: forall (m :: * -> *) h. CursorEnv m h -> Maybe (UnionCache m h)
cursorSession :: Session m h
cursorSessionEnv :: SessionEnv m h
cursorId :: CursorId
cursorReaders :: Maybe (Readers m h)
cursorWBB :: Ref (WriteBufferBlobs m h)
cursorRuns :: Vector (Ref (Run m h))
cursorUnion :: Maybe (UnionCache m h)
..} -> do
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
StrictMVar m (Map CursorId (Cursor m h))
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
cursorSessionEnv) ((Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ())
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall a b. (a -> b) -> a -> b
$
Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> (Map CursorId (Cursor m h) -> Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h)
-> m (Map CursorId (Cursor m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CursorId -> Map CursorId (Cursor m h) -> Map CursorId (Cursor m h)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete CursorId
cursorId
Maybe (Readers m h) -> (Readers m h -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (Readers m h)
cursorReaders ((Readers m h -> m ()) -> m ()) -> (Readers m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Readers m h -> m ()) -> Readers m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Readers m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
Readers.close
Vector (Ref (Run m h)) -> (Ref (Run m h) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (Ref (Run m h))
cursorRuns ((Ref (Run m h) -> m ()) -> m ())
-> (Ref (Run m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
Maybe (UnionCache m h) -> (UnionCache m h -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (UnionCache m h)
cursorUnion ((UnionCache m h -> m ()) -> m ())
-> (UnionCache m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ActionRegistry m -> UnionCache m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionCache m h -> m ()
releaseUnionCache ActionRegistry m
reg
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (WriteBufferBlobs m h)
cursorWBB)
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ CursorTrace
TraceClosedCursor
CursorState m h -> m (CursorState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure CursorState m h
forall (m :: * -> *) h. CursorState m h
CursorClosed
{-# SPECIALISE readCursor ::
ResolveSerialisedValue
-> Int
-> Cursor IO h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
-> IO (V.Vector res) #-}
readCursor ::
forall m h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> Int
-> Cursor m h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (V.Vector res)
readCursor :: forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursor ResolveSerialisedValue
resolve Int
n Cursor m h
cursor SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry =
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursorWhile ResolveSerialisedValue
resolve (Bool -> SerialisedKey -> Bool
forall a b. a -> b -> a
const Bool
True) Int
n Cursor m h
cursor SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry
data CursorClosedError
= ErrCursorClosed
deriving stock (Int -> CursorClosedError -> ShowS
[CursorClosedError] -> ShowS
CursorClosedError -> String
(Int -> CursorClosedError -> ShowS)
-> (CursorClosedError -> String)
-> ([CursorClosedError] -> ShowS)
-> Show CursorClosedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CursorClosedError -> ShowS
showsPrec :: Int -> CursorClosedError -> ShowS
$cshow :: CursorClosedError -> String
show :: CursorClosedError -> String
$cshowList :: [CursorClosedError] -> ShowS
showList :: [CursorClosedError] -> ShowS
Show, CursorClosedError -> CursorClosedError -> Bool
(CursorClosedError -> CursorClosedError -> Bool)
-> (CursorClosedError -> CursorClosedError -> Bool)
-> Eq CursorClosedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CursorClosedError -> CursorClosedError -> Bool
== :: CursorClosedError -> CursorClosedError -> Bool
$c/= :: CursorClosedError -> CursorClosedError -> Bool
/= :: CursorClosedError -> CursorClosedError -> Bool
Eq)
deriving anyclass (Show CursorClosedError
Typeable CursorClosedError
(Typeable CursorClosedError, Show CursorClosedError) =>
(CursorClosedError -> SomeException)
-> (SomeException -> Maybe CursorClosedError)
-> (CursorClosedError -> String)
-> Exception CursorClosedError
SomeException -> Maybe CursorClosedError
CursorClosedError -> String
CursorClosedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: CursorClosedError -> SomeException
toException :: CursorClosedError -> SomeException
$cfromException :: SomeException -> Maybe CursorClosedError
fromException :: SomeException -> Maybe CursorClosedError
$cdisplayException :: CursorClosedError -> String
displayException :: CursorClosedError -> String
Exception)
{-# SPECIALISE readCursorWhile ::
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor IO h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
-> IO (V.Vector res) #-}
readCursorWhile ::
forall m h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (V.Vector res)
readCursorWhile :: forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursorWhile ResolveSerialisedValue
resolve SerialisedKey -> Bool
keyIsWanted Int
n Cursor {Tracer m CursorTrace
StrictMVar m (CursorState m h)
cursorState :: forall (m :: * -> *) h.
Cursor m h -> StrictMVar m (CursorState m h)
cursorTracer :: forall (m :: * -> *) h. Cursor m h -> Tracer m CursorTrace
cursorState :: StrictMVar m (CursorState m h)
cursorTracer :: Tracer m CursorTrace
..} SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry = do
Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> CursorTrace
TraceReadingCursor Int
n
Vector res
res <- StrictMVar m (CursorState m h)
-> (CursorState m h -> m (CursorState m h, Vector res))
-> m (Vector res)
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m (a, b)) -> m b
modifyMVar StrictMVar m (CursorState m h)
cursorState ((CursorState m h -> m (CursorState m h, Vector res))
-> m (Vector res))
-> (CursorState m h -> m (CursorState m h, Vector res))
-> m (Vector res)
forall a b. (a -> b) -> a -> b
$ \case
CursorState m h
CursorClosed -> CursorClosedError -> m (CursorState m h, Vector res)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO CursorClosedError
ErrCursorClosed
state :: CursorState m h
state@(CursorOpen CursorEnv m h
cursorEnv) -> do
case CursorEnv m h -> Maybe (Readers m h)
forall (m :: * -> *) h. CursorEnv m h -> Maybe (Readers m h)
cursorReaders CursorEnv m h
cursorEnv of
Maybe (Readers m h)
Nothing ->
(CursorState m h, Vector res) -> m (CursorState m h, Vector res)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CursorState m h
state, Vector res
forall a. Vector a
V.empty)
Just Readers m h
readers -> do
(Vector res
vec, HasMore
hasMore) <- ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> Readers m h
-> Int
-> m (Vector res, HasMore)
forall h (m :: * -> *) res.
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> Readers m h
-> Int
-> m (Vector res, HasMore)
Cursor.readEntriesWhile ResolveSerialisedValue
resolve SerialisedKey -> Bool
keyIsWanted SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry Readers m h
readers Int
n
let !state' :: CursorState m h
state' = case HasMore
hasMore of
HasMore
Readers.HasMore -> CursorState m h
state
HasMore
Readers.Drained -> CursorEnv m h -> CursorState m h
forall (m :: * -> *) h. CursorEnv m h -> CursorState m h
CursorOpen (CursorEnv m h
cursorEnv {cursorReaders = Nothing})
(CursorState m h, Vector res) -> m (CursorState m h, Vector res)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CursorState m h
state', Vector res
vec)
Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> Int -> CursorTrace
TraceReadCursor Int
n (Vector res -> Int
forall a. Vector a -> Int
V.length Vector res
res)
Vector res -> m (Vector res)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Vector res
res
data SnapshotExistsError
= ErrSnapshotExists !SnapshotName
deriving stock (Int -> SnapshotExistsError -> ShowS
[SnapshotExistsError] -> ShowS
SnapshotExistsError -> String
(Int -> SnapshotExistsError -> ShowS)
-> (SnapshotExistsError -> String)
-> ([SnapshotExistsError] -> ShowS)
-> Show SnapshotExistsError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotExistsError -> ShowS
showsPrec :: Int -> SnapshotExistsError -> ShowS
$cshow :: SnapshotExistsError -> String
show :: SnapshotExistsError -> String
$cshowList :: [SnapshotExistsError] -> ShowS
showList :: [SnapshotExistsError] -> ShowS
Show, SnapshotExistsError -> SnapshotExistsError -> Bool
(SnapshotExistsError -> SnapshotExistsError -> Bool)
-> (SnapshotExistsError -> SnapshotExistsError -> Bool)
-> Eq SnapshotExistsError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotExistsError -> SnapshotExistsError -> Bool
== :: SnapshotExistsError -> SnapshotExistsError -> Bool
$c/= :: SnapshotExistsError -> SnapshotExistsError -> Bool
/= :: SnapshotExistsError -> SnapshotExistsError -> Bool
Eq)
deriving anyclass (Show SnapshotExistsError
Typeable SnapshotExistsError
(Typeable SnapshotExistsError, Show SnapshotExistsError) =>
(SnapshotExistsError -> SomeException)
-> (SomeException -> Maybe SnapshotExistsError)
-> (SnapshotExistsError -> String)
-> Exception SnapshotExistsError
SomeException -> Maybe SnapshotExistsError
SnapshotExistsError -> String
SnapshotExistsError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotExistsError -> SomeException
toException :: SnapshotExistsError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotExistsError
fromException :: SomeException -> Maybe SnapshotExistsError
$cdisplayException :: SnapshotExistsError -> String
displayException :: SnapshotExistsError -> String
Exception)
{-# SPECIALISE saveSnapshot ::
SnapshotName
-> SnapshotLabel
-> Table IO h
-> IO () #-}
saveSnapshot ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> SnapshotName
-> SnapshotLabel
-> Table m h
-> m ()
saveSnapshot :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
SnapshotName -> SnapshotLabel -> Table m h -> m ()
saveSnapshot SnapshotName
snap SnapshotLabel
label Table m h
t = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> TableTrace
TraceSaveSnapshot SnapshotName
snap
Table m h -> (TableEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withKeepTableOpen Table m h
t ((TableEnv m h -> m ()) -> m ()) -> (TableEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv ->
(ActionRegistry m -> m ()) -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m ()) -> m ())
-> (ActionRegistry m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
let hfs :: HasFS m h
hfs = TableEnv m h -> HasFS m h
forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS TableEnv m h
tEnv
hbio :: HasBlockIO m h
hbio = TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv
activeUc :: UniqCounter m
activeUc = Table m h -> UniqCounter m
forall (m :: * -> *) h. Table m h -> UniqCounter m
tableSessionUniqCounter Table m h
t
let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv) SnapshotName
snap
Bool
snapshotExists <- SnapshotName -> SessionEnv m h -> m Bool
forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap (TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv TableEnv m h
tEnv)
if Bool
snapshotExists then
SnapshotExistsError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName -> SnapshotExistsError
ErrSnapshotExists SnapshotName
snap)
else
ActionRegistry m -> m () -> m () -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> m () -> m a
withRollback_ ActionRegistry m
reg
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.createDirectory HasFS m h
hfs (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir))
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir))
TableContent m h
content <- RWVar m (TableContent m h)
-> (TableContent m h -> m (TableContent m h))
-> m (TableContent m h)
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) (ActionRegistry m -> TableContent m h -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m (TableContent m h)
duplicateTableContent ActionRegistry m
reg)
UniqCounter m
snapUc <- Int -> m (UniqCounter m)
forall (m :: * -> *). PrimMonad m => Int -> m (UniqCounter m)
newUniqCounter Int
0
let activeDir :: ActiveDir
activeDir = SessionRoot -> ActiveDir
Paths.activeDir (TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv)
let wb :: WriteBuffer
wb = TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
content
let wbb :: Ref (WriteBufferBlobs m h)
wbb = TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
content
RunNumber
snapWriteBufferNumber <- WriteBufferFsPaths -> RunNumber
Paths.writeBufferNumber (WriteBufferFsPaths -> RunNumber)
-> m WriteBufferFsPaths -> m RunNumber
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> UniqCounter m
-> ActionRegistry m
-> ActiveDir
-> NamedSnapshotDir
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m WriteBufferFsPaths
forall (m :: * -> *) h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> UniqCounter m
-> ActionRegistry m
-> ActiveDir
-> NamedSnapshotDir
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m WriteBufferFsPaths
snapshotWriteBuffer HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
activeUc UniqCounter m
snapUc ActionRegistry m
reg ActiveDir
activeDir NamedSnapshotDir
snapDir WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbb
SnapLevels (Ref (Run m h))
snapLevels <- Levels m h -> m (SnapLevels (Ref (Run m h)))
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m) =>
Levels m h -> m (SnapLevels (Ref (Run m h)))
toSnapLevels (TableContent m h -> Levels m h
forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels TableContent m h
content)
SnapLevels SnapshotRun
snapLevels' <- (Ref (Run m h) -> m SnapshotRun)
-> SnapLevels (Ref (Run m h)) -> m (SnapLevels SnapshotRun)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapLevels a -> f (SnapLevels b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
snapshotRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
snapUc ActionRegistry m
reg NamedSnapshotDir
snapDir) SnapLevels (Ref (Run m h))
snapLevels
Maybe (SnapMergingTree SnapshotRun)
mTreeOpt <- case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
content of
UnionLevel m h
NoUnion -> Maybe (SnapMergingTree SnapshotRun)
-> m (Maybe (SnapMergingTree SnapshotRun))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (SnapMergingTree SnapshotRun)
forall a. Maybe a
Nothing
Union Ref (MergingTree m h)
mTreeRef UnionCache m h
_cache -> do
SnapMergingTree (Ref (Run m h))
mTree <- Ref (MergingTree m h) -> m (SnapMergingTree (Ref (Run m h)))
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m) =>
Ref (MergingTree m h) -> m (SnapMergingTree (Ref (Run m h)))
toSnapMergingTree Ref (MergingTree m h)
mTreeRef
SnapMergingTree SnapshotRun -> Maybe (SnapMergingTree SnapshotRun)
forall a. a -> Maybe a
Just (SnapMergingTree SnapshotRun
-> Maybe (SnapMergingTree SnapshotRun))
-> m (SnapMergingTree SnapshotRun)
-> m (Maybe (SnapMergingTree SnapshotRun))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ref (Run m h) -> m SnapshotRun)
-> SnapMergingTree (Ref (Run m h))
-> m (SnapMergingTree SnapshotRun)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapMergingTree a -> f (SnapMergingTree b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
snapshotRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
snapUc ActionRegistry m
reg NamedSnapshotDir
snapDir) SnapMergingTree (Ref (Run m h))
mTree
ActionRegistry m -> TableContent m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m ()
releaseTableContent ActionRegistry m
reg TableContent m h
content
let snapMetaData :: SnapshotMetaData
snapMetaData = SnapshotLabel
-> TableConfig
-> RunNumber
-> SnapLevels SnapshotRun
-> Maybe (SnapMergingTree SnapshotRun)
-> SnapshotMetaData
SnapshotMetaData
SnapshotLabel
label
(Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig Table m h
t)
RunNumber
snapWriteBufferNumber
SnapLevels SnapshotRun
snapLevels'
Maybe (SnapMergingTree SnapshotRun)
mTreeOpt
SnapshotMetaDataFile FsPath
contentPath = NamedSnapshotDir -> SnapshotMetaDataFile
Paths.snapshotMetaDataFile NamedSnapshotDir
snapDir
SnapshotMetaDataChecksumFile FsPath
checksumPath = NamedSnapshotDir -> SnapshotMetaDataChecksumFile
Paths.snapshotMetaDataChecksumFile NamedSnapshotDir
snapDir
HasFS m h -> FsPath -> FsPath -> SnapshotMetaData -> m ()
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> FsPath -> FsPath -> SnapshotMetaData -> m ()
writeFileSnapshotMetaData HasFS m h
hfs FsPath
contentPath FsPath
checksumPath SnapshotMetaData
snapMetaData
HasFS m h -> HasBlockIO m h -> FsPath -> m ()
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> HasBlockIO m h -> FsPath -> m ()
FS.synchroniseDirectoryRecursive HasFS m h
hfs HasBlockIO m h
hbio (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir)
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> TableTrace
TraceSavedSnapshot SnapshotName
snap
data SnapshotDoesNotExistError
= ErrSnapshotDoesNotExist !SnapshotName
deriving stock (Int -> SnapshotDoesNotExistError -> ShowS
[SnapshotDoesNotExistError] -> ShowS
SnapshotDoesNotExistError -> String
(Int -> SnapshotDoesNotExistError -> ShowS)
-> (SnapshotDoesNotExistError -> String)
-> ([SnapshotDoesNotExistError] -> ShowS)
-> Show SnapshotDoesNotExistError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotDoesNotExistError -> ShowS
showsPrec :: Int -> SnapshotDoesNotExistError -> ShowS
$cshow :: SnapshotDoesNotExistError -> String
show :: SnapshotDoesNotExistError -> String
$cshowList :: [SnapshotDoesNotExistError] -> ShowS
showList :: [SnapshotDoesNotExistError] -> ShowS
Show, SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
(SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool)
-> (SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool)
-> Eq SnapshotDoesNotExistError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
== :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
$c/= :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
/= :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
Eq)
deriving anyclass (Show SnapshotDoesNotExistError
Typeable SnapshotDoesNotExistError
(Typeable SnapshotDoesNotExistError,
Show SnapshotDoesNotExistError) =>
(SnapshotDoesNotExistError -> SomeException)
-> (SomeException -> Maybe SnapshotDoesNotExistError)
-> (SnapshotDoesNotExistError -> String)
-> Exception SnapshotDoesNotExistError
SomeException -> Maybe SnapshotDoesNotExistError
SnapshotDoesNotExistError -> String
SnapshotDoesNotExistError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotDoesNotExistError -> SomeException
toException :: SnapshotDoesNotExistError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotDoesNotExistError
fromException :: SomeException -> Maybe SnapshotDoesNotExistError
$cdisplayException :: SnapshotDoesNotExistError -> String
displayException :: SnapshotDoesNotExistError -> String
Exception)
data SnapshotCorruptedError
= ErrSnapshotCorrupted
!SnapshotName
!FileCorruptedError
deriving stock (Int -> SnapshotCorruptedError -> ShowS
[SnapshotCorruptedError] -> ShowS
SnapshotCorruptedError -> String
(Int -> SnapshotCorruptedError -> ShowS)
-> (SnapshotCorruptedError -> String)
-> ([SnapshotCorruptedError] -> ShowS)
-> Show SnapshotCorruptedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotCorruptedError -> ShowS
showsPrec :: Int -> SnapshotCorruptedError -> ShowS
$cshow :: SnapshotCorruptedError -> String
show :: SnapshotCorruptedError -> String
$cshowList :: [SnapshotCorruptedError] -> ShowS
showList :: [SnapshotCorruptedError] -> ShowS
Show, SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
(SnapshotCorruptedError -> SnapshotCorruptedError -> Bool)
-> (SnapshotCorruptedError -> SnapshotCorruptedError -> Bool)
-> Eq SnapshotCorruptedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
== :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
$c/= :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
/= :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
Eq)
deriving anyclass (Show SnapshotCorruptedError
Typeable SnapshotCorruptedError
(Typeable SnapshotCorruptedError, Show SnapshotCorruptedError) =>
(SnapshotCorruptedError -> SomeException)
-> (SomeException -> Maybe SnapshotCorruptedError)
-> (SnapshotCorruptedError -> String)
-> Exception SnapshotCorruptedError
SomeException -> Maybe SnapshotCorruptedError
SnapshotCorruptedError -> String
SnapshotCorruptedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotCorruptedError -> SomeException
toException :: SnapshotCorruptedError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotCorruptedError
fromException :: SomeException -> Maybe SnapshotCorruptedError
$cdisplayException :: SnapshotCorruptedError -> String
displayException :: SnapshotCorruptedError -> String
Exception)
data SnapshotNotCompatibleError
=
ErrSnapshotWrongLabel
!SnapshotName
!SnapshotLabel
!SnapshotLabel
deriving stock (Int -> SnapshotNotCompatibleError -> ShowS
[SnapshotNotCompatibleError] -> ShowS
SnapshotNotCompatibleError -> String
(Int -> SnapshotNotCompatibleError -> ShowS)
-> (SnapshotNotCompatibleError -> String)
-> ([SnapshotNotCompatibleError] -> ShowS)
-> Show SnapshotNotCompatibleError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotNotCompatibleError -> ShowS
showsPrec :: Int -> SnapshotNotCompatibleError -> ShowS
$cshow :: SnapshotNotCompatibleError -> String
show :: SnapshotNotCompatibleError -> String
$cshowList :: [SnapshotNotCompatibleError] -> ShowS
showList :: [SnapshotNotCompatibleError] -> ShowS
Show, SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
(SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool)
-> (SnapshotNotCompatibleError
-> SnapshotNotCompatibleError -> Bool)
-> Eq SnapshotNotCompatibleError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
== :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
$c/= :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
/= :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
Eq)
deriving anyclass (Show SnapshotNotCompatibleError
Typeable SnapshotNotCompatibleError
(Typeable SnapshotNotCompatibleError,
Show SnapshotNotCompatibleError) =>
(SnapshotNotCompatibleError -> SomeException)
-> (SomeException -> Maybe SnapshotNotCompatibleError)
-> (SnapshotNotCompatibleError -> String)
-> Exception SnapshotNotCompatibleError
SomeException -> Maybe SnapshotNotCompatibleError
SnapshotNotCompatibleError -> String
SnapshotNotCompatibleError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotNotCompatibleError -> SomeException
toException :: SnapshotNotCompatibleError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotNotCompatibleError
fromException :: SomeException -> Maybe SnapshotNotCompatibleError
$cdisplayException :: SnapshotNotCompatibleError -> String
displayException :: SnapshotNotCompatibleError -> String
Exception)
{-# SPECIALISE openTableFromSnapshot ::
TableConfigOverride
-> Session IO h
-> SnapshotName
-> SnapshotLabel
-> ResolveSerialisedValue
-> IO (Table IO h) #-}
openTableFromSnapshot ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> TableConfigOverride
-> Session m h
-> SnapshotName
-> SnapshotLabel
-> ResolveSerialisedValue
-> m (Table m h)
openTableFromSnapshot :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
TableConfigOverride
-> Session m h
-> SnapshotName
-> SnapshotLabel
-> ResolveSerialisedValue
-> m (Table m h)
openTableFromSnapshot TableConfigOverride
policyOveride Session m h
sesh SnapshotName
snap SnapshotLabel
label ResolveSerialisedValue
resolve = do
TableId
tableId <- Unique -> TableId
uniqueToTableId (Unique -> TableId) -> m Unique -> m TableId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (Session m h -> UniqCounter m
forall (m :: * -> *) h. Session m h -> UniqCounter m
sessionUniqCounter Session m h
sesh)
let tr :: Tracer m TableTrace
tr = TableId -> TableTrace -> LSMTreeTrace
TraceTable TableId
tableId (TableTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m TableTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
lsmTreeTracer Session m h
sesh
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m TableTrace
tr (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> TableConfigOverride -> TableTrace
TraceOpenTableFromSnapshot SnapshotName
snap TableConfigOverride
policyOveride
SnapshotName -> m (Table m h) -> m (Table m h)
forall (m :: * -> *) a. MonadCatch m => SnapshotName -> m a -> m a
wrapFileCorruptedErrorAsSnapshotCorruptedError SnapshotName
snap (m (Table m h) -> m (Table m h)) -> m (Table m h) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ do
Session m h -> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
sesh ((SessionEnv m h -> m (Table m h)) -> m (Table m h))
-> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
(ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Table m h)) -> m (Table m h))
-> (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
let hfs :: HasFS m h
hfs = SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv
hbio :: HasBlockIO m h
hbio = SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO SessionEnv m h
seshEnv
uc :: UniqCounter m
uc = Session m h -> UniqCounter m
forall (m :: * -> *) h. Session m h -> UniqCounter m
sessionUniqCounter Session m h
sesh
let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) SnapshotName
snap
HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir) m Bool -> (Bool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
b ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName -> SnapshotDoesNotExistError
ErrSnapshotDoesNotExist SnapshotName
snap)
let SnapshotMetaDataFile FsPath
contentPath = NamedSnapshotDir -> SnapshotMetaDataFile
Paths.snapshotMetaDataFile NamedSnapshotDir
snapDir
SnapshotMetaDataChecksumFile FsPath
checksumPath = NamedSnapshotDir -> SnapshotMetaDataChecksumFile
Paths.snapshotMetaDataChecksumFile NamedSnapshotDir
snapDir
SnapshotMetaData
snapMetaData <- HasFS m h -> FsPath -> FsPath -> m SnapshotMetaData
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> FsPath -> FsPath -> m SnapshotMetaData
readFileSnapshotMetaData HasFS m h
hfs FsPath
contentPath FsPath
checksumPath
let SnapshotMetaData SnapshotLabel
label' TableConfig
conf RunNumber
snapWriteBuffer SnapLevels SnapshotRun
snapLevels Maybe (SnapMergingTree SnapshotRun)
mTreeOpt
= TableConfigOverride -> SnapshotMetaData -> SnapshotMetaData
overrideTableConfig TableConfigOverride
policyOveride SnapshotMetaData
snapMetaData
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (SnapshotLabel
label SnapshotLabel -> SnapshotLabel -> Bool
forall a. Eq a => a -> a -> Bool
== SnapshotLabel
label') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
SnapshotNotCompatibleError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName
-> SnapshotLabel -> SnapshotLabel -> SnapshotNotCompatibleError
ErrSnapshotWrongLabel SnapshotName
snap SnapshotLabel
label SnapshotLabel
label')
ArenaManager (PrimState m)
am <- m (ArenaManager (PrimState m))
forall (m :: * -> *). PrimMonad m => m (ArenaManager (PrimState m))
newArenaManager
let salt :: Salt
salt = SessionEnv m h -> Salt
forall (m :: * -> *) h. SessionEnv m h -> Salt
sessionSalt SessionEnv m h
seshEnv
let activeDir :: ActiveDir
activeDir = SessionRoot -> ActiveDir
Paths.activeDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv)
let snapWriteBufferPaths :: WriteBufferFsPaths
snapWriteBufferPaths = FsPath -> RunNumber -> WriteBufferFsPaths
Paths.WriteBufferFsPaths (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir) RunNumber
snapWriteBuffer
(WriteBuffer
tableWriteBuffer, Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs) <-
ActionRegistry m
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActiveDir
-> WriteBufferFsPaths
-> m (WriteBuffer, Ref (WriteBufferBlobs m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
ActionRegistry m
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActiveDir
-> WriteBufferFsPaths
-> m (WriteBuffer, Ref (WriteBufferBlobs m h))
openWriteBuffer ActionRegistry m
reg ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ActiveDir
activeDir WriteBufferFsPaths
snapWriteBufferPaths
SnapLevels (Ref (Run m h))
snapLevels' <- (SnapshotRun -> m (Ref (Run m h)))
-> SnapLevels SnapshotRun -> m (SnapLevels (Ref (Run m h)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapLevels a -> f (SnapLevels b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> Salt
-> SnapshotRun
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> Salt
-> SnapshotRun
-> m (Ref (Run m h))
openRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ActionRegistry m
reg NamedSnapshotDir
snapDir ActiveDir
activeDir Salt
salt) SnapLevels SnapshotRun
snapLevels
UnionLevel m h
unionLevel <- case Maybe (SnapMergingTree SnapshotRun)
mTreeOpt of
Maybe (SnapMergingTree SnapshotRun)
Nothing -> UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure UnionLevel m h
forall (m :: * -> *) h. UnionLevel m h
NoUnion
Just SnapMergingTree SnapshotRun
mTree -> do
SnapMergingTree (Ref (Run m h))
snapTree <- (SnapshotRun -> m (Ref (Run m h)))
-> SnapMergingTree SnapshotRun
-> m (SnapMergingTree (Ref (Run m h)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapMergingTree a -> f (SnapMergingTree b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> Salt
-> SnapshotRun
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> Salt
-> SnapshotRun
-> m (Ref (Run m h))
openRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ActionRegistry m
reg NamedSnapshotDir
snapDir ActiveDir
activeDir Salt
salt) SnapMergingTree SnapshotRun
mTree
Ref (MergingTree m h)
mt <- HasFS m h
-> HasBlockIO m h
-> Salt
-> UniqCounter m
-> ResolveSerialisedValue
-> ActiveDir
-> ActionRegistry m
-> SnapMergingTree (Ref (Run m h))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> Salt
-> UniqCounter m
-> ResolveSerialisedValue
-> ActiveDir
-> ActionRegistry m
-> SnapMergingTree (Ref (Run m h))
-> m (Ref (MergingTree m h))
fromSnapMergingTree HasFS m h
hfs HasBlockIO m h
hbio Salt
salt UniqCounter m
uc ResolveSerialisedValue
resolve ActiveDir
activeDir ActionRegistry m
reg SnapMergingTree (Ref (Run m h))
snapTree
Ref (MergingTree m h) -> m Bool
forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
isStructurallyEmpty Ref (MergingTree m h)
mt m Bool -> (Bool -> m (UnionLevel m h)) -> m (UnionLevel m h)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True ->
UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure UnionLevel m h
forall (m :: * -> *) h. UnionLevel m h
NoUnion
Bool
False -> do
(Ref (Run m h) -> m ()) -> SnapMergingTree (Ref (Run m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) SnapMergingTree (Ref (Run m h))
snapTree
UnionCache m h
cache <- ActionRegistry m -> Ref (MergingTree m h) -> m (UnionCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Ref (MergingTree m h) -> m (UnionCache m h)
mkUnionCache ActionRegistry m
reg Ref (MergingTree m h)
mt
UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Ref (MergingTree m h) -> UnionCache m h -> UnionLevel m h
forall (m :: * -> *) h.
Ref (MergingTree m h) -> UnionCache m h -> UnionLevel m h
Union Ref (MergingTree m h)
mt UnionCache m h
cache)
Levels m h
tableLevels <- HasFS m h
-> HasBlockIO m h
-> Salt
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> ActionRegistry m
-> ActiveDir
-> SnapLevels (Ref (Run m h))
-> m (Levels m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> Salt
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> ActionRegistry m
-> ActiveDir
-> SnapLevels (Ref (Run m h))
-> m (Levels m h)
fromSnapLevels HasFS m h
hfs HasBlockIO m h
hbio Salt
salt UniqCounter m
uc TableConfig
conf ResolveSerialisedValue
resolve ActionRegistry m
reg ActiveDir
activeDir SnapLevels (Ref (Run m h))
snapLevels'
(Ref (Run m h) -> m ()) -> SnapLevels (Ref (Run m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) SnapLevels (Ref (Run m h))
snapLevels'
LevelsCache m h
tableCache <- ActionRegistry m -> Levels m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
mkLevelsCache ActionRegistry m
reg Levels m h
tableLevels
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf ArenaManager (PrimState m)
am Tracer m TableTrace
tr TableId
tableId (TableContent m h -> m (Table m h))
-> TableContent m h -> m (Table m h)
forall a b. (a -> b) -> a -> b
$! TableContent {
WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer
, Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
, Levels m h
tableLevels :: Levels m h
tableLevels :: Levels m h
tableLevels
, LevelsCache m h
tableCache :: LevelsCache m h
tableCache :: LevelsCache m h
tableCache
, tableUnionLevel :: UnionLevel m h
tableUnionLevel = UnionLevel m h
unionLevel
}
{-# SPECIALISE wrapFileCorruptedErrorAsSnapshotCorruptedError ::
SnapshotName
-> IO a
-> IO a
#-}
wrapFileCorruptedErrorAsSnapshotCorruptedError ::
forall m a.
(MonadCatch m)
=> SnapshotName
-> m a
-> m a
wrapFileCorruptedErrorAsSnapshotCorruptedError :: forall (m :: * -> *) a. MonadCatch m => SnapshotName -> m a -> m a
wrapFileCorruptedErrorAsSnapshotCorruptedError SnapshotName
snapshotName =
(FileCorruptedError -> SnapshotCorruptedError) -> m a -> m a
forall e1 e2 (m :: * -> *) a.
(Exception e1, Exception e2, MonadCatch m) =>
(e1 -> e2) -> m a -> m a
mapExceptionWithActionRegistry (SnapshotName -> FileCorruptedError -> SnapshotCorruptedError
ErrSnapshotCorrupted SnapshotName
snapshotName)
{-# SPECIALISE doesSnapshotExist ::
Session IO h
-> SnapshotName
-> IO Bool #-}
doesSnapshotExist ::
(MonadMask m, MonadSTM m)
=> Session m h
-> SnapshotName
-> m Bool
doesSnapshotExist :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m) =>
Session m h -> SnapshotName -> m Bool
doesSnapshotExist Session m h
sesh SnapshotName
snap = Session m h -> (SessionEnv m h -> m Bool) -> m Bool
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
sesh (SnapshotName -> SessionEnv m h -> m Bool
forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap)
doesSnapshotDirExist :: SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist :: forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap SessionEnv m h
seshEnv = do
let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) SnapshotName
snap
HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir)
{-# SPECIALISE deleteSnapshot ::
Session IO h
-> SnapshotName
-> IO () #-}
deleteSnapshot ::
(MonadMask m, MonadSTM m)
=> Session m h
-> SnapshotName
-> m ()
deleteSnapshot :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m) =>
Session m h -> SnapshotName -> m ()
deleteSnapshot Session m h
sesh SnapshotName
snap = do
Tracer m SessionTrace -> SessionTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m SessionTrace
forall (m :: * -> *) h. Session m h -> Tracer m SessionTrace
sessionTracer Session m h
sesh) (SessionTrace -> m ()) -> SessionTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> SessionTrace
TraceDeleteSnapshot SnapshotName
snap
Session m h -> (SessionEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
sesh ((SessionEnv m h -> m ()) -> m ())
-> (SessionEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) SnapshotName
snap
Bool
snapshotExists <- SnapshotName -> SessionEnv m h -> m Bool
forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap SessionEnv m h
seshEnv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
snapshotExists (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName -> SnapshotDoesNotExistError
ErrSnapshotDoesNotExist SnapshotName
snap)
HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir)
Tracer m SessionTrace -> SessionTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m SessionTrace
forall (m :: * -> *) h. Session m h -> Tracer m SessionTrace
sessionTracer Session m h
sesh) (SessionTrace -> m ()) -> SessionTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> SessionTrace
TraceDeletedSnapshot SnapshotName
snap
{-# SPECIALISE listSnapshots :: Session IO h -> IO [SnapshotName] #-}
listSnapshots ::
(MonadMask m, MonadSTM m)
=> Session m h
-> m [SnapshotName]
listSnapshots :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m) =>
Session m h -> m [SnapshotName]
listSnapshots Session m h
sesh = do
Tracer m SessionTrace -> SessionTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m SessionTrace
forall (m :: * -> *) h. Session m h -> Tracer m SessionTrace
sessionTracer Session m h
sesh) SessionTrace
TraceListSnapshots
Session m h
-> (SessionEnv m h -> m [SnapshotName]) -> m [SnapshotName]
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
sesh ((SessionEnv m h -> m [SnapshotName]) -> m [SnapshotName])
-> (SessionEnv m h -> m [SnapshotName]) -> m [SnapshotName]
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
let hfs :: HasFS m h
hfs = SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv
root :: SessionRoot
root = SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv
Set String
contents <- HasFS m h -> HasCallStack => FsPath -> m (Set String)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m (Set String)
FS.listDirectory HasFS m h
hfs (SessionRoot -> FsPath
Paths.snapshotsDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv))
[Maybe SnapshotName]
snaps <- (String -> m (Maybe SnapshotName))
-> [String] -> m [Maybe SnapshotName]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (HasFS m h -> SessionRoot -> String -> m (Maybe SnapshotName)
forall {m :: * -> *} {h}.
Monad m =>
HasFS m h -> SessionRoot -> String -> m (Maybe SnapshotName)
checkSnapshot HasFS m h
hfs SessionRoot
root) ([String] -> m [Maybe SnapshotName])
-> [String] -> m [Maybe SnapshotName]
forall a b. (a -> b) -> a -> b
$ Set String -> [String]
forall a. Set a -> [a]
Set.toList Set String
contents
[SnapshotName] -> m [SnapshotName]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([SnapshotName] -> m [SnapshotName])
-> [SnapshotName] -> m [SnapshotName]
forall a b. (a -> b) -> a -> b
$ [Maybe SnapshotName] -> [SnapshotName]
forall a. [Maybe a] -> [a]
catMaybes [Maybe SnapshotName]
snaps
where
checkSnapshot :: HasFS m h -> SessionRoot -> String -> m (Maybe SnapshotName)
checkSnapshot HasFS m h
hfs SessionRoot
root String
s = do
let snap :: SnapshotName
snap = String -> SnapshotName
Paths.toSnapshotName String
s
Bool
b <- HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs
(NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir (NamedSnapshotDir -> FsPath) -> NamedSnapshotDir -> FsPath
forall a b. (a -> b) -> a -> b
$ SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir SessionRoot
root SnapshotName
snap)
if Bool
b then Maybe SnapshotName -> m (Maybe SnapshotName)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe SnapshotName -> m (Maybe SnapshotName))
-> Maybe SnapshotName -> m (Maybe SnapshotName)
forall a b. (a -> b) -> a -> b
$ SnapshotName -> Maybe SnapshotName
forall a. a -> Maybe a
Just SnapshotName
snap
else Maybe SnapshotName -> m (Maybe SnapshotName)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe SnapshotName -> m (Maybe SnapshotName))
-> Maybe SnapshotName -> m (Maybe SnapshotName)
forall a b. (a -> b) -> a -> b
$ Maybe SnapshotName
forall a. Maybe a
Nothing
{-# SPECIALISE duplicate :: Table IO h -> IO (Table IO h) #-}
duplicate ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> Table m h
-> m (Table m h)
duplicate :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Table m h -> m (Table m h)
duplicate t :: Table m h
t@Table{Tracer m TableTrace
RWVar m (TableState m h)
ArenaManager (PrimState m)
TableId
TableConfig
Session m h
tableConfig :: forall (m :: * -> *) h. Table m h -> TableConfig
tableState :: forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableArenaManager :: forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableTracer :: forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableId :: forall (m :: * -> *) h. Table m h -> TableId
tableSession :: forall (m :: * -> *) h. Table m h -> Session m h
tableConfig :: TableConfig
tableState :: RWVar m (TableState m h)
tableArenaManager :: ArenaManager (PrimState m)
tableTracer :: Tracer m TableTrace
tableId :: TableId
tableSession :: Session m h
..} = do
TableId
childTableId <- Unique -> TableId
uniqueToTableId (Unique -> TableId) -> m Unique -> m TableId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (Table m h -> UniqCounter m
forall (m :: * -> *) h. Table m h -> UniqCounter m
tableSessionUniqCounter Table m h
t)
let childTableTracer :: Tracer m TableTrace
childTableTracer = TableId -> TableTrace -> LSMTreeTrace
TraceTable TableId
childTableId (TableTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m TableTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
lsmTreeTracer Session m h
tableSession
parentTableId :: TableId
parentTableId = TableId
tableId
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m TableTrace
childTableTracer (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ TableId -> TableTrace
TraceDuplicate TableId
parentTableId
Table m h -> (TableEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withKeepTableOpen Table m h
t ((TableEnv m h -> m (Table m h)) -> m (Table m h))
-> (TableEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \TableEnv{RWVar m (TableContent m h)
SessionEnv m h
tableSessionEnv :: forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableContent :: forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableSessionEnv :: SessionEnv m h
tableContent :: RWVar m (TableContent m h)
..} -> do
Session m h -> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
tableSession ((SessionEnv m h -> m (Table m h)) -> m (Table m h))
-> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
_ -> do
(ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Table m h)) -> m (Table m h))
-> (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
TableContent m h
content <- RWVar m (TableContent m h)
-> (TableContent m h -> m (TableContent m h))
-> m (TableContent m h)
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess RWVar m (TableContent m h)
tableContent (ActionRegistry m -> TableContent m h -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m (TableContent m h)
duplicateTableContent ActionRegistry m
reg)
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> TableContent m h
-> m (Table m h)
newWith
ActionRegistry m
reg
Session m h
tableSession
SessionEnv m h
tableSessionEnv
TableConfig
tableConfig
ArenaManager (PrimState m)
tableArenaManager
Tracer m TableTrace
childTableTracer
TableId
childTableId
TableContent m h
content
data TableUnionNotCompatibleError
= ErrTableUnionHandleTypeMismatch
!Int
!TypeRep
!Int
!TypeRep
| ErrTableUnionSessionMismatch
!Int
!FsErrorPath
!Int
!FsErrorPath
deriving stock (Int -> TableUnionNotCompatibleError -> ShowS
[TableUnionNotCompatibleError] -> ShowS
TableUnionNotCompatibleError -> String
(Int -> TableUnionNotCompatibleError -> ShowS)
-> (TableUnionNotCompatibleError -> String)
-> ([TableUnionNotCompatibleError] -> ShowS)
-> Show TableUnionNotCompatibleError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TableUnionNotCompatibleError -> ShowS
showsPrec :: Int -> TableUnionNotCompatibleError -> ShowS
$cshow :: TableUnionNotCompatibleError -> String
show :: TableUnionNotCompatibleError -> String
$cshowList :: [TableUnionNotCompatibleError] -> ShowS
showList :: [TableUnionNotCompatibleError] -> ShowS
Show, TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
(TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool)
-> (TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool)
-> Eq TableUnionNotCompatibleError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
== :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
$c/= :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
/= :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
Eq)
deriving anyclass (Show TableUnionNotCompatibleError
Typeable TableUnionNotCompatibleError
(Typeable TableUnionNotCompatibleError,
Show TableUnionNotCompatibleError) =>
(TableUnionNotCompatibleError -> SomeException)
-> (SomeException -> Maybe TableUnionNotCompatibleError)
-> (TableUnionNotCompatibleError -> String)
-> Exception TableUnionNotCompatibleError
SomeException -> Maybe TableUnionNotCompatibleError
TableUnionNotCompatibleError -> String
TableUnionNotCompatibleError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: TableUnionNotCompatibleError -> SomeException
toException :: TableUnionNotCompatibleError -> SomeException
$cfromException :: SomeException -> Maybe TableUnionNotCompatibleError
fromException :: SomeException -> Maybe TableUnionNotCompatibleError
$cdisplayException :: TableUnionNotCompatibleError -> String
displayException :: TableUnionNotCompatibleError -> String
Exception)
{-# SPECIALISE unions :: NonEmpty (Table IO h) -> IO (Table IO h) #-}
unions ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> NonEmpty (Table m h)
-> m (Table m h)
unions :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
NonEmpty (Table m h) -> m (Table m h)
unions NonEmpty (Table m h)
ts = do
Session m h
sesh <- NonEmpty (Table m h) -> m (Session m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m) =>
NonEmpty (Table m h) -> m (Session m h)
ensureSessionsMatch NonEmpty (Table m h)
ts
TableId
childTableId <- Unique -> TableId
uniqueToTableId (Unique -> TableId) -> m Unique -> m TableId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (Session m h -> UniqCounter m
forall (m :: * -> *) h. Session m h -> UniqCounter m
sessionUniqCounter Session m h
sesh)
let childTableTracer :: Tracer m TableTrace
childTableTracer = TableId -> TableTrace -> LSMTreeTrace
TraceTable TableId
childTableId (TableTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m TableTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
lsmTreeTracer Session m h
sesh
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m TableTrace
childTableTracer (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ NonEmpty TableId -> TableTrace
TraceIncrementalUnions ((Table m h -> TableId) -> NonEmpty (Table m h) -> NonEmpty TableId
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
NE.map Table m h -> TableId
forall (m :: * -> *) h. Table m h -> TableId
tableId NonEmpty (Table m h)
ts)
let conf :: TableConfig
conf = Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig (NonEmpty (Table m h) -> Table m h
forall a. NonEmpty a -> a
NE.last NonEmpty (Table m h)
ts)
m (SessionState m h)
-> (SessionState m h -> m ())
-> (ActionRegistry m
-> SessionState m h -> m (SessionState m h, Table m h))
-> m (Table m h)
forall (m :: * -> *) st a.
(PrimMonad m, MonadCatch m) =>
m st
-> (st -> m ()) -> (ActionRegistry m -> st -> m (st, a)) -> m a
modifyWithActionRegistry
(STM m (SessionState m h) -> m (SessionState m h)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (SessionState m h) -> m (SessionState m h))
-> STM m (SessionState m h) -> m (SessionState m h)
forall a b. (a -> b) -> a -> b
$ RWVar m (SessionState m h) -> STM m (SessionState m h)
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> STM m a
RW.unsafeAcquireReadAccess (Session m h -> RWVar m (SessionState m h)
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState Session m h
sesh))
(\SessionState m h
_ -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ RWVar m (SessionState m h) -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> STM m ()
RW.unsafeReleaseReadAccess (Session m h -> RWVar m (SessionState m h)
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState Session m h
sesh)) ((ActionRegistry m
-> SessionState m h -> m (SessionState m h, Table m h))
-> m (Table m h))
-> (ActionRegistry m
-> SessionState m h -> m (SessionState m h, Table m h))
-> m (Table m h)
forall a b. (a -> b) -> a -> b
$
\ActionRegistry m
reg -> \case
SessionState m h
SessionClosed -> SessionClosedError -> m (SessionState m h, Table m h)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SessionClosedError
ErrSessionClosed
seshState :: SessionState m h
seshState@(SessionOpen SessionEnv m h
seshEnv) -> do
Table m h
t <- ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> Tracer m TableTrace
-> TableId
-> NonEmpty (Table m h)
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, MonadMVar m, MonadST m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> Tracer m TableTrace
-> TableId
-> NonEmpty (Table m h)
-> m (Table m h)
unionsInOpenSession ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf Tracer m TableTrace
childTableTracer TableId
childTableId NonEmpty (Table m h)
ts
(SessionState m h, Table m h) -> m (SessionState m h, Table m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SessionState m h
seshState, Table m h
t)
{-# SPECIALISE unionsInOpenSession ::
ActionRegistry IO
-> Session IO h
-> SessionEnv IO h
-> TableConfig
-> Tracer IO TableTrace
-> TableId
-> NonEmpty (Table IO h)
-> IO (Table IO h) #-}
unionsInOpenSession ::
(MonadSTM m, MonadMask m, MonadMVar m, MonadST m)
=> ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> Tracer m TableTrace
-> TableId
-> NonEmpty (Table m h)
-> m (Table m h)
unionsInOpenSession :: forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, MonadMVar m, MonadST m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> Tracer m TableTrace
-> TableId
-> NonEmpty (Table m h)
-> m (Table m h)
unionsInOpenSession ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf Tracer m TableTrace
tr !TableId
tableId NonEmpty (Table m h)
ts = do
[Ref (MergingTree m h)]
mts <- [Table m h]
-> (Table m h -> m (Ref (MergingTree m h)))
-> m [Ref (MergingTree m h)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (NonEmpty (Table m h) -> [Table m h]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (Table m h)
ts) ((Table m h -> m (Ref (MergingTree m h)))
-> m [Ref (MergingTree m h)])
-> (Table m h -> m (Ref (MergingTree m h)))
-> m [Ref (MergingTree m h)]
forall a b. (a -> b) -> a -> b
$ \Table m h
t ->
Table m h
-> (TableEnv m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withKeepTableOpen Table m h
t ((TableEnv m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h)))
-> (TableEnv m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv ->
RWVar m (TableContent m h)
-> (TableContent m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h)))
-> (TableContent m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tc ->
ActionRegistry m
-> m (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> m ())
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(UniqCounter m
-> SessionEnv m h
-> TableConfig
-> TableContent m h
-> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
UniqCounter m
-> SessionEnv m h
-> TableConfig
-> TableContent m h
-> m (Ref (MergingTree m h))
tableContentToMergingTree (Session m h -> UniqCounter m
forall (m :: * -> *) h. Session m h -> UniqCounter m
sessionUniqCounter Session m h
sesh) SessionEnv m h
seshEnv TableConfig
conf TableContent m h
tc)
Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
Ref (MergingTree m h)
mt <- ActionRegistry m
-> m (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> m ())
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg ([Ref (MergingTree m h)] -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, PrimMonad m) =>
[Ref (MergingTree m h)] -> m (Ref (MergingTree m h))
newPendingUnionMerge [Ref (MergingTree m h)]
mts) Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
[Ref (MergingTree m h)] -> (Ref (MergingTree m h) -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Ref (MergingTree m h)]
mts (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ())
-> (Ref (MergingTree m h) -> m ()) -> Ref (MergingTree m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef)
TableContent m h
content <- Ref (MergingTree m h) -> m Bool
forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
MT.isStructurallyEmpty Ref (MergingTree m h)
mt m Bool -> (Bool -> m (TableContent m h)) -> m (TableContent m h)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True -> do
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (MergingTree m h)
mt)
UniqCounter m
-> SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m, MonadMVar m) =>
UniqCounter m
-> SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
newEmptyTableContent ((Session m h -> UniqCounter m
forall (m :: * -> *) h. Session m h -> UniqCounter m
sessionUniqCounter Session m h
sesh)) SessionEnv m h
seshEnv ActionRegistry m
reg
Bool
False -> do
TableContent m h
empty <- UniqCounter m
-> SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m, MonadMVar m) =>
UniqCounter m
-> SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
newEmptyTableContent (Session m h -> UniqCounter m
forall (m :: * -> *) h. Session m h -> UniqCounter m
sessionUniqCounter Session m h
sesh) SessionEnv m h
seshEnv ActionRegistry m
reg
UnionCache m h
cache <- ActionRegistry m -> Ref (MergingTree m h) -> m (UnionCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Ref (MergingTree m h) -> m (UnionCache m h)
mkUnionCache ActionRegistry m
reg Ref (MergingTree m h)
mt
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent m h
empty { tableUnionLevel = Union mt cache }
let am :: ArenaManager (PrimState m)
am = Table m h -> ArenaManager (PrimState m)
forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager (NonEmpty (Table m h) -> Table m h
forall a. NonEmpty a -> a
NE.last NonEmpty (Table m h)
ts)
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf ArenaManager (PrimState m)
am Tracer m TableTrace
tr TableId
tableId TableContent m h
content
{-# SPECIALISE tableContentToMergingTree ::
UniqCounter IO
-> SessionEnv IO h
-> TableConfig
-> TableContent IO h
-> IO (Ref (MergingTree IO h)) #-}
tableContentToMergingTree ::
forall m h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> UniqCounter m
-> SessionEnv m h
-> TableConfig
-> TableContent m h
-> m (Ref (MergingTree m h))
tableContentToMergingTree :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
UniqCounter m
-> SessionEnv m h
-> TableConfig
-> TableContent m h
-> m (Ref (MergingTree m h))
tableContentToMergingTree UniqCounter m
uc SessionEnv m h
seshEnv TableConfig
conf
tc :: TableContent m h
tc@TableContent {
Levels m h
tableLevels :: forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels :: Levels m h
tableLevels,
UnionLevel m h
tableUnionLevel :: forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel :: UnionLevel m h
tableUnionLevel
} =
m (Maybe (Ref (Run m h)))
-> (Maybe (Ref (Run m h)) -> m ())
-> (Maybe (Ref (Run m h)) -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (UniqCounter m
-> SessionEnv m h
-> TableConfig
-> TableContent m h
-> m (Maybe (Ref (Run m h)))
forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
UniqCounter m
-> SessionEnv m h
-> TableConfig
-> TableContent m h
-> m (Maybe (Ref (Run m h)))
writeBufferToNewRun UniqCounter m
uc SessionEnv m h
seshEnv TableConfig
conf TableContent m h
tc)
((Ref (Run m h) -> m ()) -> Maybe (Ref (Run m h)) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) ((Maybe (Ref (Run m h)) -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h)))
-> (Maybe (Ref (Run m h)) -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \Maybe (Ref (Run m h))
mwbr ->
let runs :: [PreExistingRun m h]
runs :: [PreExistingRun m h]
runs = Maybe (PreExistingRun m h) -> [PreExistingRun m h]
forall a. Maybe a -> [a]
maybeToList ((Ref (Run m h) -> PreExistingRun m h)
-> Maybe (Ref (Run m h)) -> Maybe (PreExistingRun m h)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun Maybe (Ref (Run m h))
mwbr)
[PreExistingRun m h]
-> [PreExistingRun m h] -> [PreExistingRun m h]
forall a. [a] -> [a] -> [a]
++ (Level m h -> [PreExistingRun m h])
-> [Level m h] -> [PreExistingRun m h]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap Level m h -> [PreExistingRun m h]
levelToPreExistingRuns (Levels m h -> [Level m h]
forall a. Vector a -> [a]
V.toList Levels m h
tableLevels)
unionmt :: Maybe (Ref (MergingTree m h))
unionmt = case UnionLevel m h
tableUnionLevel of
UnionLevel m h
NoUnion -> Maybe (Ref (MergingTree m h))
forall a. Maybe a
Nothing
Union Ref (MergingTree m h)
mt UnionCache m h
_ -> Ref (MergingTree m h) -> Maybe (Ref (MergingTree m h))
forall a. a -> Maybe a
Just Ref (MergingTree m h)
mt
in [PreExistingRun m h]
-> Maybe (Ref (MergingTree m h)) -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, PrimMonad m) =>
[PreExistingRun m h]
-> Maybe (Ref (MergingTree m h)) -> m (Ref (MergingTree m h))
newPendingLevelMerge [PreExistingRun m h]
runs Maybe (Ref (MergingTree m h))
unionmt
where
levelToPreExistingRuns :: Level m h -> [PreExistingRun m h]
levelToPreExistingRuns :: Level m h -> [PreExistingRun m h]
levelToPreExistingRuns Level{IncomingRun m h
incomingRun :: IncomingRun m h
incomingRun :: forall (m :: * -> *) h. Level m h -> IncomingRun m h
incomingRun, Vector (Ref (Run m h))
residentRuns :: Vector (Ref (Run m h))
residentRuns :: forall (m :: * -> *) h. Level m h -> Vector (Ref (Run m h))
residentRuns} =
case IncomingRun m h
incomingRun of
Single Ref (Run m h)
r -> Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun Ref (Run m h)
r
Merging MergePolicyForLevel
_ NominalDebt
_ PrimVar (PrimState m) NominalCredits
_ Ref (MergingRun LevelMergeType m h)
mr -> Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h
forall (m :: * -> *) h.
Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h
PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr
PreExistingRun m h -> [PreExistingRun m h] -> [PreExistingRun m h]
forall a. a -> [a] -> [a]
: (Ref (Run m h) -> PreExistingRun m h)
-> [Ref (Run m h)] -> [PreExistingRun m h]
forall a b. (a -> b) -> [a] -> [b]
map Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun (Vector (Ref (Run m h)) -> [Ref (Run m h)]
forall a. Vector a -> [a]
V.toList Vector (Ref (Run m h))
residentRuns)
{-# SPECIALISE writeBufferToNewRun ::
UniqCounter IO
-> SessionEnv IO h
-> TableConfig
-> TableContent IO h
-> IO (Maybe (Ref (Run IO h))) #-}
writeBufferToNewRun ::
(MonadMask m, MonadST m, MonadSTM m)
=> UniqCounter m
-> SessionEnv m h
-> TableConfig
-> TableContent m h
-> m (Maybe (Ref (Run m h)))
writeBufferToNewRun :: forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
UniqCounter m
-> SessionEnv m h
-> TableConfig
-> TableContent m h
-> m (Maybe (Ref (Run m h)))
writeBufferToNewRun UniqCounter m
uc
SessionEnv {
sessionRoot :: forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot = SessionRoot
root,
sessionSalt :: forall (m :: * -> *) h. SessionEnv m h -> Salt
sessionSalt = Salt
salt,
sessionHasFS :: forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS = HasFS m h
hfs,
sessionHasBlockIO :: forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO = HasBlockIO m h
hbio
}
TableConfig
conf
TableContent{
WriteBuffer
tableWriteBuffer :: forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer,
Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
}
| WriteBuffer -> Bool
WB.null WriteBuffer
tableWriteBuffer = Maybe (Ref (Run m h)) -> m (Maybe (Ref (Run m h)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Ref (Run m h))
forall a. Maybe a
Nothing
| Bool
otherwise = Ref (Run m h) -> Maybe (Ref (Run m h))
forall a. a -> Maybe a
Just (Ref (Run m h) -> Maybe (Ref (Run m h)))
-> m (Ref (Run m h)) -> m (Maybe (Ref (Run m h)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> do
!Unique
uniq <- UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter UniqCounter m
uc
let (!RunParams
runParams, !RunFsPaths
runPaths) = ActiveDir
-> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
mergingRunParamsForLevel
(SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root) TableConfig
conf Unique
uniq (Int -> LevelNo
LevelNo Int
1)
HasFS m h
-> HasBlockIO m h
-> Salt
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> Salt
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
Run.fromWriteBuffer
HasFS m h
hfs HasBlockIO m h
hbio Salt
salt
RunParams
runParams RunFsPaths
runPaths
WriteBuffer
tableWriteBuffer
Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
{-# SPECIALISE ensureSessionsMatch ::
NonEmpty (Table IO h)
-> IO (Session IO h) #-}
ensureSessionsMatch ::
(MonadSTM m, MonadThrow m)
=> NonEmpty (Table m h)
-> m (Session m h)
ensureSessionsMatch :: forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m) =>
NonEmpty (Table m h) -> m (Session m h)
ensureSessionsMatch (Table m h
t :| [Table m h]
ts) = do
let sesh :: Session m h
sesh = Table m h -> Session m h
forall (m :: * -> *) h. Table m h -> Session m h
tableSession Table m h
t
Session m h
-> (SessionEnv m h -> m (Session m h)) -> m (Session m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
sesh ((SessionEnv m h -> m (Session m h)) -> m (Session m h))
-> (SessionEnv m h -> m (Session m h)) -> m (Session m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
let root :: FsErrorPath
root = HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) (SessionRoot -> FsPath
getSessionRoot (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv))
[(Int, Table m h)] -> ((Int, Table m h) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ ([Int] -> [Table m h] -> [(Int, Table m h)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] [Table m h]
ts) (((Int, Table m h) -> m ()) -> m ())
-> ((Int, Table m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(Int
i, Table m h
t') -> do
let sesh' :: Session m h
sesh' = Table m h -> Session m h
forall (m :: * -> *) h. Table m h -> Session m h
tableSession Table m h
t'
Session m h -> (SessionEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withKeepSessionOpen Session m h
sesh' ((SessionEnv m h -> m ()) -> m ())
-> (SessionEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv' -> do
let root' :: FsErrorPath
root' = HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv') (SessionRoot -> FsPath
getSessionRoot (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv'))
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (FsErrorPath
root FsErrorPath -> FsErrorPath -> Bool
forall a. Eq a => a -> a -> Bool
== FsErrorPath
root') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ TableUnionNotCompatibleError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (TableUnionNotCompatibleError -> m ())
-> TableUnionNotCompatibleError -> m ()
forall a b. (a -> b) -> a -> b
$ Int
-> FsErrorPath
-> Int
-> FsErrorPath
-> TableUnionNotCompatibleError
ErrTableUnionSessionMismatch Int
0 FsErrorPath
root Int
i FsErrorPath
root'
Session m h -> m (Session m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Session m h
sesh
newtype UnionDebt = UnionDebt Int
deriving newtype (Int -> UnionDebt -> ShowS
[UnionDebt] -> ShowS
UnionDebt -> String
(Int -> UnionDebt -> ShowS)
-> (UnionDebt -> String)
-> ([UnionDebt] -> ShowS)
-> Show UnionDebt
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnionDebt -> ShowS
showsPrec :: Int -> UnionDebt -> ShowS
$cshow :: UnionDebt -> String
show :: UnionDebt -> String
$cshowList :: [UnionDebt] -> ShowS
showList :: [UnionDebt] -> ShowS
Show, UnionDebt -> UnionDebt -> Bool
(UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool) -> Eq UnionDebt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnionDebt -> UnionDebt -> Bool
== :: UnionDebt -> UnionDebt -> Bool
$c/= :: UnionDebt -> UnionDebt -> Bool
/= :: UnionDebt -> UnionDebt -> Bool
Eq, Eq UnionDebt
Eq UnionDebt =>
(UnionDebt -> UnionDebt -> Ordering)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> Ord UnionDebt
UnionDebt -> UnionDebt -> Bool
UnionDebt -> UnionDebt -> Ordering
UnionDebt -> UnionDebt -> UnionDebt
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: UnionDebt -> UnionDebt -> Ordering
compare :: UnionDebt -> UnionDebt -> Ordering
$c< :: UnionDebt -> UnionDebt -> Bool
< :: UnionDebt -> UnionDebt -> Bool
$c<= :: UnionDebt -> UnionDebt -> Bool
<= :: UnionDebt -> UnionDebt -> Bool
$c> :: UnionDebt -> UnionDebt -> Bool
> :: UnionDebt -> UnionDebt -> Bool
$c>= :: UnionDebt -> UnionDebt -> Bool
>= :: UnionDebt -> UnionDebt -> Bool
$cmax :: UnionDebt -> UnionDebt -> UnionDebt
max :: UnionDebt -> UnionDebt -> UnionDebt
$cmin :: UnionDebt -> UnionDebt -> UnionDebt
min :: UnionDebt -> UnionDebt -> UnionDebt
Ord, Integer -> UnionDebt
UnionDebt -> UnionDebt
UnionDebt -> UnionDebt -> UnionDebt
(UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (Integer -> UnionDebt)
-> Num UnionDebt
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: UnionDebt -> UnionDebt -> UnionDebt
+ :: UnionDebt -> UnionDebt -> UnionDebt
$c- :: UnionDebt -> UnionDebt -> UnionDebt
- :: UnionDebt -> UnionDebt -> UnionDebt
$c* :: UnionDebt -> UnionDebt -> UnionDebt
* :: UnionDebt -> UnionDebt -> UnionDebt
$cnegate :: UnionDebt -> UnionDebt
negate :: UnionDebt -> UnionDebt
$cabs :: UnionDebt -> UnionDebt
abs :: UnionDebt -> UnionDebt
$csignum :: UnionDebt -> UnionDebt
signum :: UnionDebt -> UnionDebt
$cfromInteger :: Integer -> UnionDebt
fromInteger :: Integer -> UnionDebt
Num)
{-# SPECIALISE remainingUnionDebt :: Table IO h -> IO UnionDebt #-}
remainingUnionDebt ::
(MonadSTM m, MonadMVar m, MonadThrow m, PrimMonad m)
=> Table m h -> m UnionDebt
remainingUnionDebt :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, MonadThrow m, PrimMonad m) =>
Table m h -> m UnionDebt
remainingUnionDebt Table m h
t = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) TableTrace
TraceRemainingUnionDebt
Table m h -> (TableEnv m h -> m UnionDebt) -> m UnionDebt
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withKeepTableOpen Table m h
t ((TableEnv m h -> m UnionDebt) -> m UnionDebt)
-> (TableEnv m h -> m UnionDebt) -> m UnionDebt
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
RWVar m (TableContent m h)
-> (TableContent m h -> m UnionDebt) -> m UnionDebt
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m UnionDebt) -> m UnionDebt)
-> (TableContent m h -> m UnionDebt) -> m UnionDebt
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tableContent -> do
case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tableContent of
UnionLevel m h
NoUnion ->
UnionDebt -> m UnionDebt
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> UnionDebt
UnionDebt Int
0)
Union Ref (MergingTree m h)
mt UnionCache m h
_ -> do
(MergeDebt (MergeCredits Int
c), NumEntries
_) <- Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
MT.remainingMergeDebt Ref (MergingTree m h)
mt
UnionDebt -> m UnionDebt
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> UnionDebt
UnionDebt Int
c)
newtype UnionCredits = UnionCredits Int
deriving newtype (Int -> UnionCredits -> ShowS
[UnionCredits] -> ShowS
UnionCredits -> String
(Int -> UnionCredits -> ShowS)
-> (UnionCredits -> String)
-> ([UnionCredits] -> ShowS)
-> Show UnionCredits
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnionCredits -> ShowS
showsPrec :: Int -> UnionCredits -> ShowS
$cshow :: UnionCredits -> String
show :: UnionCredits -> String
$cshowList :: [UnionCredits] -> ShowS
showList :: [UnionCredits] -> ShowS
Show, UnionCredits -> UnionCredits -> Bool
(UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool) -> Eq UnionCredits
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnionCredits -> UnionCredits -> Bool
== :: UnionCredits -> UnionCredits -> Bool
$c/= :: UnionCredits -> UnionCredits -> Bool
/= :: UnionCredits -> UnionCredits -> Bool
Eq, Eq UnionCredits
Eq UnionCredits =>
(UnionCredits -> UnionCredits -> Ordering)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> Ord UnionCredits
UnionCredits -> UnionCredits -> Bool
UnionCredits -> UnionCredits -> Ordering
UnionCredits -> UnionCredits -> UnionCredits
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: UnionCredits -> UnionCredits -> Ordering
compare :: UnionCredits -> UnionCredits -> Ordering
$c< :: UnionCredits -> UnionCredits -> Bool
< :: UnionCredits -> UnionCredits -> Bool
$c<= :: UnionCredits -> UnionCredits -> Bool
<= :: UnionCredits -> UnionCredits -> Bool
$c> :: UnionCredits -> UnionCredits -> Bool
> :: UnionCredits -> UnionCredits -> Bool
$c>= :: UnionCredits -> UnionCredits -> Bool
>= :: UnionCredits -> UnionCredits -> Bool
$cmax :: UnionCredits -> UnionCredits -> UnionCredits
max :: UnionCredits -> UnionCredits -> UnionCredits
$cmin :: UnionCredits -> UnionCredits -> UnionCredits
min :: UnionCredits -> UnionCredits -> UnionCredits
Ord, Integer -> UnionCredits
UnionCredits -> UnionCredits
UnionCredits -> UnionCredits -> UnionCredits
(UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (Integer -> UnionCredits)
-> Num UnionCredits
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: UnionCredits -> UnionCredits -> UnionCredits
+ :: UnionCredits -> UnionCredits -> UnionCredits
$c- :: UnionCredits -> UnionCredits -> UnionCredits
- :: UnionCredits -> UnionCredits -> UnionCredits
$c* :: UnionCredits -> UnionCredits -> UnionCredits
* :: UnionCredits -> UnionCredits -> UnionCredits
$cnegate :: UnionCredits -> UnionCredits
negate :: UnionCredits -> UnionCredits
$cabs :: UnionCredits -> UnionCredits
abs :: UnionCredits -> UnionCredits
$csignum :: UnionCredits -> UnionCredits
signum :: UnionCredits -> UnionCredits
$cfromInteger :: Integer -> UnionCredits
fromInteger :: Integer -> UnionCredits
Num)
{-# SPECIALISE supplyUnionCredits ::
ResolveSerialisedValue -> Table IO h -> UnionCredits -> IO UnionCredits #-}
supplyUnionCredits ::
(MonadST m, MonadSTM m, MonadMVar m, MonadMask m)
=> ResolveSerialisedValue -> Table m h -> UnionCredits -> m UnionCredits
supplyUnionCredits :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMVar m, MonadMask m) =>
ResolveSerialisedValue
-> Table m h -> UnionCredits -> m UnionCredits
supplyUnionCredits ResolveSerialisedValue
resolve Table m h
t UnionCredits
credits = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ UnionCredits -> TableTrace
TraceSupplyUnionCredits UnionCredits
credits
Table m h -> (TableEnv m h -> m UnionCredits) -> m UnionCredits
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withKeepTableOpen Table m h
t ((TableEnv m h -> m UnionCredits) -> m UnionCredits)
-> (TableEnv m h -> m UnionCredits) -> m UnionCredits
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
UnionCredits
leftovers <- RWVar m (TableContent m h)
-> (TableContent m h -> m UnionCredits) -> m UnionCredits
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m UnionCredits) -> m UnionCredits)
-> (TableContent m h -> m UnionCredits) -> m UnionCredits
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tc -> do
case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tc of
UnionLevel m h
NoUnion ->
UnionCredits -> m UnionCredits
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UnionCredits -> UnionCredits -> UnionCredits
forall a. Ord a => a -> a -> a
max UnionCredits
0 UnionCredits
credits)
Union Ref (MergingTree m h)
mt UnionCache m h
_ -> do
let conf :: TableConfig
conf = Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig Table m h
t
let AllocNumEntries Int
x = TableConfig -> WriteBufferAlloc
confWriteBufferAlloc TableConfig
conf
let thresh :: CreditThreshold
thresh = UnspentCredits -> CreditThreshold
MR.CreditThreshold (MergeCredits -> UnspentCredits
MR.UnspentCredits (Int -> MergeCredits
MergeCredits Int
x))
MergeCredits Int
leftovers <-
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> Salt
-> RunParams
-> CreditThreshold
-> SessionRoot
-> UniqCounter m
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
forall (m :: * -> *) h.
(MonadMVar m, MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> Salt
-> RunParams
-> CreditThreshold
-> SessionRoot
-> UniqCounter m
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
MT.supplyCredits
(TableEnv m h -> HasFS m h
forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS TableEnv m h
tEnv)
(TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
ResolveSerialisedValue
resolve
(TableEnv m h -> Salt
forall (m :: * -> *) h. TableEnv m h -> Salt
tableSessionSalt TableEnv m h
tEnv)
(TableConfig -> RunLevelNo -> RunParams
runParamsForLevel TableConfig
conf RunLevelNo
UnionLevel)
CreditThreshold
thresh
(TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv)
(Table m h -> UniqCounter m
forall (m :: * -> *) h. Table m h -> UniqCounter m
tableSessionUniqCounter Table m h
t)
Ref (MergingTree m h)
mt
(let UnionCredits Int
c = UnionCredits
credits in Int -> MergeCredits
MergeCredits Int
c)
UnionCredits -> m UnionCredits
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> UnionCredits
UnionCredits Int
leftovers)
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ UnionCredits -> UnionCredits -> TableTrace
TraceSuppliedUnionCredits UnionCredits
credits UnionCredits
leftovers
m (TableContent m h)
-> (TableContent m h -> m ())
-> (ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_
(RWVar m (TableContent m h) -> m (TableContent m h)
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> m a
RW.unsafeAcquireWriteAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv))
(STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (TableContent m h -> STM m ()) -> TableContent m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RWVar m (TableContent m h) -> TableContent m h -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> a -> STM m ()
RW.unsafeReleaseWriteAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv))
((ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ())
-> (ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg TableContent m h
tc ->
case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tc of
UnionLevel m h
NoUnion -> TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent m h
tc
Union Ref (MergingTree m h)
mt UnionCache m h
cache -> do
UnionLevel m h
unionLevel' <- Ref (MergingTree m h) -> m Bool
forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
MT.isStructurallyEmpty Ref (MergingTree m h)
mt m Bool -> (Bool -> m (UnionLevel m h)) -> m (UnionLevel m h)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True ->
UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure UnionLevel m h
forall (m :: * -> *) h. UnionLevel m h
NoUnion
Bool
False -> do
UnionCache m h
cache' <- ActionRegistry m -> Ref (MergingTree m h) -> m (UnionCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Ref (MergingTree m h) -> m (UnionCache m h)
mkUnionCache ActionRegistry m
reg Ref (MergingTree m h)
mt
ActionRegistry m -> UnionCache m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> UnionCache m h -> m ()
releaseUnionCache ActionRegistry m
reg UnionCache m h
cache
UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Ref (MergingTree m h) -> UnionCache m h -> UnionLevel m h
forall (m :: * -> *) h.
Ref (MergingTree m h) -> UnionCache m h -> UnionLevel m h
Union Ref (MergingTree m h)
mt UnionCache m h
cache')
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent m h
tc { tableUnionLevel = unionLevel' }
UnionCredits -> m UnionCredits
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure UnionCredits
leftovers