{-# LANGUAGE DataKinds #-}
module Database.LSMTree.Internal (
Session' (..)
, Table' (..)
, Cursor' (..)
, NormalTable (..)
, NormalCursor (..)
, MonoidalTable (..)
, MonoidalCursor (..)
, SessionDirDoesNotExistError (..)
, SessionDirLockedError (..)
, SessionDirCorruptedError (..)
, SessionClosedError (..)
, TableClosedError (..)
, TableCorruptedError (..)
, TableTooLargeError (..)
, TableUnionNotCompatibleError (..)
, SnapshotExistsError (..)
, SnapshotDoesNotExistError (..)
, SnapshotCorruptedError (..)
, SnapshotNotCompatibleError (..)
, BlobRefInvalidError (..)
, CursorClosedError (..)
, FileFormat (..)
, FileCorruptedError (..)
, Paths.InvalidSnapshotNameError (..)
, LSMTreeTrace (..)
, TableTrace (..)
, Session (..)
, SessionState (..)
, SessionEnv (..)
, withOpenSession
, withSession
, openSession
, closeSession
, Table (..)
, TableState (..)
, TableEnv (..)
, withOpenTable
, ResolveSerialisedValue
, withTable
, new
, close
, lookups
, rangeLookup
, updates
, retrieveBlobs
, Cursor (..)
, CursorState (..)
, CursorEnv (..)
, OffsetKey (..)
, withCursor
, newCursor
, closeCursor
, readCursor
, readCursorWhile
, SnapshotLabel
, createSnapshot
, openSnapshot
, deleteSnapshot
, doesSnapshotExist
, listSnapshots
, duplicate
, unions
, UnionDebt (..)
, remainingUnionDebt
, UnionCredits (..)
, supplyUnionCredits
) where
import Control.ActionRegistry
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM (MonadSTM (..))
import Control.Concurrent.Class.MonadSTM.RWVar (RWVar)
import qualified Control.Concurrent.Class.MonadSTM.RWVar as RW
import Control.DeepSeq
import Control.Monad (forM, unless, void)
import Control.Monad.Class.MonadAsync as Async
import Control.Monad.Class.MonadST (MonadST (..))
import Control.Monad.Class.MonadThrow
import Control.Monad.Primitive
import Control.RefCount
import Control.Tracer
import Data.Either (fromRight)
import Data.Foldable
import Data.Kind
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NE
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes, fromMaybe, maybeToList)
import Data.Monoid (First (..))
import qualified Data.Set as Set
import Data.Typeable
import qualified Data.Vector as V
import Database.LSMTree.Internal.Arena (ArenaManager, newArenaManager)
import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
import Database.LSMTree.Internal.Config
import Database.LSMTree.Internal.CRC32C (FileCorruptedError (..),
FileFormat (..))
import qualified Database.LSMTree.Internal.Cursor as Cursor
import Database.LSMTree.Internal.Entry (Entry, NumEntries (..))
import Database.LSMTree.Internal.IncomingRun (IncomingRun (..))
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue,
TableCorruptedError (..), lookupsIO,
lookupsIOWithWriteBuffer)
import Database.LSMTree.Internal.MergeSchedule
import Database.LSMTree.Internal.MergingRun (TableTooLargeError (..))
import qualified Database.LSMTree.Internal.MergingRun as MR
import Database.LSMTree.Internal.MergingTree
import qualified Database.LSMTree.Internal.MergingTree as MT
import qualified Database.LSMTree.Internal.MergingTree.Lookup as MT
import Database.LSMTree.Internal.Paths (SessionRoot (..),
SnapshotMetaDataChecksumFile (..),
SnapshotMetaDataFile (..), SnapshotName)
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Range (Range (..))
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
import qualified Database.LSMTree.Internal.RunReaders as Readers
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
SerialisedKey, SerialisedValue)
import Database.LSMTree.Internal.Snapshot
import Database.LSMTree.Internal.Snapshot.Codec
import Database.LSMTree.Internal.UniqCounter
import qualified Database.LSMTree.Internal.Vector as V
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified System.FS.API as FS
import System.FS.API (FsError, FsErrorPath (..), FsPath, HasFS)
import qualified System.FS.BlockIO.API as FS
import System.FS.BlockIO.API (HasBlockIO)
type Session' :: (Type -> Type) -> Type
data Session' m = forall h. Typeable h => Session' !(Session m h)
instance NFData (Session' m) where
rnf :: Session' m -> ()
rnf (Session' Session m h
s) = Session m h -> ()
forall a. NFData a => a -> ()
rnf Session m h
s
type Table' :: (Type -> Type) -> Type -> Type -> Type -> Type
data Table' m k v b = forall h. Typeable h => Table' (Table m h)
instance NFData (Table' m k v b) where
rnf :: Table' m k v b -> ()
rnf (Table' Table m h
t) = Table m h -> ()
forall a. NFData a => a -> ()
rnf Table m h
t
type Cursor' :: (Type -> Type) -> Type -> Type -> Type -> Type
data Cursor' m k v b = forall h. Typeable h => Cursor' (Cursor m h)
instance NFData (Cursor' m k v b) where
rnf :: Cursor' m k v b -> ()
rnf (Cursor' Cursor m h
t) = Cursor m h -> ()
forall a. NFData a => a -> ()
rnf Cursor m h
t
type NormalTable :: (Type -> Type) -> Type -> Type -> Type -> Type
data NormalTable m k v b = forall h. Typeable h =>
NormalTable !(Table m h)
instance NFData (NormalTable m k v b) where
rnf :: NormalTable m k v b -> ()
rnf (NormalTable Table m h
t) = Table m h -> ()
forall a. NFData a => a -> ()
rnf Table m h
t
type NormalCursor :: (Type -> Type) -> Type -> Type -> Type -> Type
data NormalCursor m k v b = forall h. Typeable h =>
NormalCursor !(Cursor m h)
instance NFData (NormalCursor m k v b) where
rnf :: NormalCursor m k v b -> ()
rnf (NormalCursor Cursor m h
c) = Cursor m h -> ()
forall a. NFData a => a -> ()
rnf Cursor m h
c
type MonoidalTable :: (Type -> Type) -> Type -> Type -> Type
data MonoidalTable m k v = forall h. Typeable h =>
MonoidalTable !(Table m h)
instance NFData (MonoidalTable m k v) where
rnf :: MonoidalTable m k v -> ()
rnf (MonoidalTable Table m h
t) = Table m h -> ()
forall a. NFData a => a -> ()
rnf Table m h
t
type MonoidalCursor :: (Type -> Type) -> Type -> Type -> Type
data MonoidalCursor m k v = forall h. Typeable h =>
MonoidalCursor !(Cursor m h)
instance NFData (MonoidalCursor m k v) where
rnf :: MonoidalCursor m k v -> ()
rnf (MonoidalCursor Cursor m h
c) = Cursor m h -> ()
forall a. NFData a => a -> ()
rnf Cursor m h
c
data LSMTreeTrace =
TraceOpenSession FsPath
| TraceNewSession
| TraceRestoreSession
| TraceCloseSession
| TraceNewTable
| TraceOpenSnapshot SnapshotName TableConfigOverride
| TraceTable TableId TableTrace
| TraceDeleteSnapshot SnapshotName
| TraceListSnapshots
| TraceCursor CursorId CursorTrace
| TraceUnions (NonEmpty TableId)
deriving stock Int -> LSMTreeTrace -> ShowS
[LSMTreeTrace] -> ShowS
LSMTreeTrace -> String
(Int -> LSMTreeTrace -> ShowS)
-> (LSMTreeTrace -> String)
-> ([LSMTreeTrace] -> ShowS)
-> Show LSMTreeTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LSMTreeTrace -> ShowS
showsPrec :: Int -> LSMTreeTrace -> ShowS
$cshow :: LSMTreeTrace -> String
show :: LSMTreeTrace -> String
$cshowList :: [LSMTreeTrace] -> ShowS
showList :: [LSMTreeTrace] -> ShowS
Show
data TableTrace =
TraceCreateTable TableConfig
| TraceCloseTable
| TraceLookups Int
| TraceRangeLookup (Range SerialisedKey)
| TraceUpdates Int
| TraceMerge (AtLevel MergeTrace)
| TraceSnapshot SnapshotName
| TraceDuplicate
| TraceRemainingUnionDebt
| TraceSupplyUnionCredits UnionCredits
deriving stock Int -> TableTrace -> ShowS
[TableTrace] -> ShowS
TableTrace -> String
(Int -> TableTrace -> ShowS)
-> (TableTrace -> String)
-> ([TableTrace] -> ShowS)
-> Show TableTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TableTrace -> ShowS
showsPrec :: Int -> TableTrace -> ShowS
$cshow :: TableTrace -> String
show :: TableTrace -> String
$cshowList :: [TableTrace] -> ShowS
showList :: [TableTrace] -> ShowS
Show
data CursorTrace =
TraceCreateCursor TableId
| TraceCloseCursor
| TraceReadCursor Int
deriving stock Int -> CursorTrace -> ShowS
[CursorTrace] -> ShowS
CursorTrace -> String
(Int -> CursorTrace -> ShowS)
-> (CursorTrace -> String)
-> ([CursorTrace] -> ShowS)
-> Show CursorTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CursorTrace -> ShowS
showsPrec :: Int -> CursorTrace -> ShowS
$cshow :: CursorTrace -> String
show :: CursorTrace -> String
$cshowList :: [CursorTrace] -> ShowS
showList :: [CursorTrace] -> ShowS
Show
data Session m h = Session {
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState :: !(RWVar m (SessionState m h))
, forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer :: !(Tracer m LSMTreeTrace)
}
instance NFData (Session m h) where
rnf :: Session m h -> ()
rnf (Session RWVar m (SessionState m h)
a Tracer m LSMTreeTrace
b) = RWVar m (SessionState m h) -> ()
forall a. NFData a => a -> ()
rnf RWVar m (SessionState m h)
a () -> () -> ()
forall a b. a -> b -> b
`seq` Tracer m LSMTreeTrace -> ()
forall a. a -> ()
rwhnf Tracer m LSMTreeTrace
b
data SessionState m h =
SessionOpen !(SessionEnv m h)
| SessionClosed
data SessionEnv m h = SessionEnv {
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot :: !SessionRoot
, forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS :: !(HasFS m h)
, forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO :: !(HasBlockIO m h)
, forall (m :: * -> *) h. SessionEnv m h -> LockFileHandle m
sessionLockFile :: !(FS.LockFileHandle m)
, forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter :: !(UniqCounter m)
, forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables :: !(StrictMVar m (Map TableId (Table m h)))
, forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors :: !(StrictMVar m (Map CursorId (Cursor m h)))
}
data SessionClosedError
= ErrSessionClosed
deriving stock (Int -> SessionClosedError -> ShowS
[SessionClosedError] -> ShowS
SessionClosedError -> String
(Int -> SessionClosedError -> ShowS)
-> (SessionClosedError -> String)
-> ([SessionClosedError] -> ShowS)
-> Show SessionClosedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionClosedError -> ShowS
showsPrec :: Int -> SessionClosedError -> ShowS
$cshow :: SessionClosedError -> String
show :: SessionClosedError -> String
$cshowList :: [SessionClosedError] -> ShowS
showList :: [SessionClosedError] -> ShowS
Show, SessionClosedError -> SessionClosedError -> Bool
(SessionClosedError -> SessionClosedError -> Bool)
-> (SessionClosedError -> SessionClosedError -> Bool)
-> Eq SessionClosedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionClosedError -> SessionClosedError -> Bool
== :: SessionClosedError -> SessionClosedError -> Bool
$c/= :: SessionClosedError -> SessionClosedError -> Bool
/= :: SessionClosedError -> SessionClosedError -> Bool
Eq)
deriving anyclass (Show SessionClosedError
Typeable SessionClosedError
(Typeable SessionClosedError, Show SessionClosedError) =>
(SessionClosedError -> SomeException)
-> (SomeException -> Maybe SessionClosedError)
-> (SessionClosedError -> String)
-> Exception SessionClosedError
SomeException -> Maybe SessionClosedError
SessionClosedError -> String
SessionClosedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionClosedError -> SomeException
toException :: SessionClosedError -> SomeException
$cfromException :: SomeException -> Maybe SessionClosedError
fromException :: SomeException -> Maybe SessionClosedError
$cdisplayException :: SessionClosedError -> String
displayException :: SessionClosedError -> String
Exception)
{-# INLINE withOpenSession #-}
{-# SPECIALISE withOpenSession ::
Session IO h
-> (SessionEnv IO h -> IO a)
-> IO a #-}
withOpenSession ::
(MonadSTM m, MonadThrow m)
=> Session m h
-> (SessionEnv m h -> m a)
-> m a
withOpenSession :: forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh SessionEnv m h -> m a
action = RWVar m (SessionState m h) -> (SessionState m h -> m a) -> m a
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (Session m h -> RWVar m (SessionState m h)
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState Session m h
sesh) ((SessionState m h -> m a) -> m a)
-> (SessionState m h -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \case
SessionState m h
SessionClosed -> SessionClosedError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SessionClosedError
ErrSessionClosed
SessionOpen SessionEnv m h
seshEnv -> SessionEnv m h -> m a
action SessionEnv m h
seshEnv
{-# SPECIALISE withSession ::
Tracer IO LSMTreeTrace
-> HasFS IO h
-> HasBlockIO IO h
-> FsPath
-> (Session IO h -> IO a)
-> IO a #-}
withSession ::
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
=> Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> FsPath
-> (Session m h -> m a)
-> m a
withSession :: forall (m :: * -> *) h a.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> FsPath
-> (Session m h -> m a)
-> m a
withSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio FsPath
dir = m (Session m h)
-> (Session m h -> m ()) -> (Session m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
openSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio FsPath
dir) Session m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> m ()
closeSession
data SessionDirDoesNotExistError
= ErrSessionDirDoesNotExist !FsErrorPath
deriving stock (Int -> SessionDirDoesNotExistError -> ShowS
[SessionDirDoesNotExistError] -> ShowS
SessionDirDoesNotExistError -> String
(Int -> SessionDirDoesNotExistError -> ShowS)
-> (SessionDirDoesNotExistError -> String)
-> ([SessionDirDoesNotExistError] -> ShowS)
-> Show SessionDirDoesNotExistError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionDirDoesNotExistError -> ShowS
showsPrec :: Int -> SessionDirDoesNotExistError -> ShowS
$cshow :: SessionDirDoesNotExistError -> String
show :: SessionDirDoesNotExistError -> String
$cshowList :: [SessionDirDoesNotExistError] -> ShowS
showList :: [SessionDirDoesNotExistError] -> ShowS
Show, SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
(SessionDirDoesNotExistError
-> SessionDirDoesNotExistError -> Bool)
-> (SessionDirDoesNotExistError
-> SessionDirDoesNotExistError -> Bool)
-> Eq SessionDirDoesNotExistError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
== :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
$c/= :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
/= :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
Eq)
deriving anyclass (Show SessionDirDoesNotExistError
Typeable SessionDirDoesNotExistError
(Typeable SessionDirDoesNotExistError,
Show SessionDirDoesNotExistError) =>
(SessionDirDoesNotExistError -> SomeException)
-> (SomeException -> Maybe SessionDirDoesNotExistError)
-> (SessionDirDoesNotExistError -> String)
-> Exception SessionDirDoesNotExistError
SomeException -> Maybe SessionDirDoesNotExistError
SessionDirDoesNotExistError -> String
SessionDirDoesNotExistError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionDirDoesNotExistError -> SomeException
toException :: SessionDirDoesNotExistError -> SomeException
$cfromException :: SomeException -> Maybe SessionDirDoesNotExistError
fromException :: SomeException -> Maybe SessionDirDoesNotExistError
$cdisplayException :: SessionDirDoesNotExistError -> String
displayException :: SessionDirDoesNotExistError -> String
Exception)
data SessionDirLockedError
= ErrSessionDirLocked !FsErrorPath
deriving stock (Int -> SessionDirLockedError -> ShowS
[SessionDirLockedError] -> ShowS
SessionDirLockedError -> String
(Int -> SessionDirLockedError -> ShowS)
-> (SessionDirLockedError -> String)
-> ([SessionDirLockedError] -> ShowS)
-> Show SessionDirLockedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionDirLockedError -> ShowS
showsPrec :: Int -> SessionDirLockedError -> ShowS
$cshow :: SessionDirLockedError -> String
show :: SessionDirLockedError -> String
$cshowList :: [SessionDirLockedError] -> ShowS
showList :: [SessionDirLockedError] -> ShowS
Show, SessionDirLockedError -> SessionDirLockedError -> Bool
(SessionDirLockedError -> SessionDirLockedError -> Bool)
-> (SessionDirLockedError -> SessionDirLockedError -> Bool)
-> Eq SessionDirLockedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionDirLockedError -> SessionDirLockedError -> Bool
== :: SessionDirLockedError -> SessionDirLockedError -> Bool
$c/= :: SessionDirLockedError -> SessionDirLockedError -> Bool
/= :: SessionDirLockedError -> SessionDirLockedError -> Bool
Eq)
deriving anyclass (Show SessionDirLockedError
Typeable SessionDirLockedError
(Typeable SessionDirLockedError, Show SessionDirLockedError) =>
(SessionDirLockedError -> SomeException)
-> (SomeException -> Maybe SessionDirLockedError)
-> (SessionDirLockedError -> String)
-> Exception SessionDirLockedError
SomeException -> Maybe SessionDirLockedError
SessionDirLockedError -> String
SessionDirLockedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionDirLockedError -> SomeException
toException :: SessionDirLockedError -> SomeException
$cfromException :: SomeException -> Maybe SessionDirLockedError
fromException :: SomeException -> Maybe SessionDirLockedError
$cdisplayException :: SessionDirLockedError -> String
displayException :: SessionDirLockedError -> String
Exception)
data SessionDirCorruptedError
= ErrSessionDirCorrupted !FsErrorPath
deriving stock (Int -> SessionDirCorruptedError -> ShowS
[SessionDirCorruptedError] -> ShowS
SessionDirCorruptedError -> String
(Int -> SessionDirCorruptedError -> ShowS)
-> (SessionDirCorruptedError -> String)
-> ([SessionDirCorruptedError] -> ShowS)
-> Show SessionDirCorruptedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionDirCorruptedError -> ShowS
showsPrec :: Int -> SessionDirCorruptedError -> ShowS
$cshow :: SessionDirCorruptedError -> String
show :: SessionDirCorruptedError -> String
$cshowList :: [SessionDirCorruptedError] -> ShowS
showList :: [SessionDirCorruptedError] -> ShowS
Show, SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
(SessionDirCorruptedError -> SessionDirCorruptedError -> Bool)
-> (SessionDirCorruptedError -> SessionDirCorruptedError -> Bool)
-> Eq SessionDirCorruptedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
== :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
$c/= :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
/= :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
Eq)
deriving anyclass (Show SessionDirCorruptedError
Typeable SessionDirCorruptedError
(Typeable SessionDirCorruptedError,
Show SessionDirCorruptedError) =>
(SessionDirCorruptedError -> SomeException)
-> (SomeException -> Maybe SessionDirCorruptedError)
-> (SessionDirCorruptedError -> String)
-> Exception SessionDirCorruptedError
SomeException -> Maybe SessionDirCorruptedError
SessionDirCorruptedError -> String
SessionDirCorruptedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionDirCorruptedError -> SomeException
toException :: SessionDirCorruptedError -> SomeException
$cfromException :: SomeException -> Maybe SessionDirCorruptedError
fromException :: SomeException -> Maybe SessionDirCorruptedError
$cdisplayException :: SessionDirCorruptedError -> String
displayException :: SessionDirCorruptedError -> String
Exception)
{-# SPECIALISE openSession ::
Tracer IO LSMTreeTrace
-> HasFS IO h
-> HasBlockIO IO h
-> FsPath
-> IO (Session IO h) #-}
openSession ::
forall m h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m)
=> Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> FsPath
-> m (Session m h)
openSession :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
openSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio FsPath
dir =
(ActionRegistry m -> m (Session m h)) -> m (Session m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Session m h)) -> m (Session m h))
-> (ActionRegistry m -> m (Session m h)) -> m (Session m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m LSMTreeTrace
tr (FsPath -> LSMTreeTrace
TraceOpenSession FsPath
dir)
Bool
dirExists <- HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
dir
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
dirExists (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
SessionDirDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirDoesNotExistError
ErrSessionDirDoesNotExist (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
dir))
Set String
dirContents <- HasFS m h -> HasCallStack => FsPath -> m (Set String)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m (Set String)
FS.listDirectory HasFS m h
hfs FsPath
dir
Either FsError (Maybe (LockFileHandle m))
elock <-
ActionRegistry m
-> (Either FsError (Maybe (LockFileHandle m))
-> Maybe (LockFileHandle m))
-> m (Either FsError (Maybe (LockFileHandle m)))
-> (LockFileHandle m -> m ())
-> m (Either FsError (Maybe (LockFileHandle m)))
forall (m :: * -> *) a b.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> (a -> Maybe b) -> m a -> (b -> m ()) -> m a
withRollbackFun ActionRegistry m
reg
(Maybe (LockFileHandle m)
-> Either FsError (Maybe (LockFileHandle m))
-> Maybe (LockFileHandle m)
forall b a. b -> Either a b -> b
fromRight Maybe (LockFileHandle m)
forall a. Maybe a
Nothing)
m (Either FsError (Maybe (LockFileHandle m)))
acquireLock
LockFileHandle m -> m ()
forall {m :: * -> *}. LockFileHandle m -> m ()
releaseLock
case Either FsError (Maybe (LockFileHandle m))
elock of
Left FsError
e
| FsErrorType
FS.FsResourceAlreadyInUse <- FsError -> FsErrorType
FS.fsErrorType FsError
e
, fsep :: FsErrorPath
fsep@(FsErrorPath Maybe MountPoint
_ FsPath
fsp) <- FsError -> FsErrorPath
FS.fsErrorPath FsError
e
, FsPath
fsp FsPath -> FsPath -> Bool
forall a. Eq a => a -> a -> Bool
== FsPath
lockFilePath
-> SessionDirLockedError -> m (Session m h)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirLockedError
ErrSessionDirLocked FsErrorPath
fsep)
Left FsError
e -> FsError -> m (Session m h)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO FsError
e
Right Maybe (LockFileHandle m)
Nothing -> SessionDirLockedError -> m (Session m h)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirLockedError
ErrSessionDirLocked (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
lockFilePath))
Right (Just LockFileHandle m
sessionFileLock) ->
if Set String -> Bool
forall a. Set a -> Bool
Set.null Set String
dirContents then ActionRegistry m -> LockFileHandle m -> m (Session m h)
newSession ActionRegistry m
reg LockFileHandle m
sessionFileLock
else ActionRegistry m -> LockFileHandle m -> m (Session m h)
restoreSession ActionRegistry m
reg LockFileHandle m
sessionFileLock
where
root :: SessionRoot
root = FsPath -> SessionRoot
Paths.SessionRoot FsPath
dir
lockFilePath :: FsPath
lockFilePath = SessionRoot -> FsPath
Paths.lockFile SessionRoot
root
activeDirPath :: FsPath
activeDirPath = ActiveDir -> FsPath
Paths.getActiveDir (SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root)
snapshotsDirPath :: FsPath
snapshotsDirPath = SessionRoot -> FsPath
Paths.snapshotsDir SessionRoot
root
acquireLock :: m (Either FsError (Maybe (LockFileHandle m)))
acquireLock = forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @m @FsError (m (Maybe (LockFileHandle m))
-> m (Either FsError (Maybe (LockFileHandle m))))
-> m (Maybe (LockFileHandle m))
-> m (Either FsError (Maybe (LockFileHandle m)))
forall a b. (a -> b) -> a -> b
$ HasBlockIO m h
-> FsPath -> LockMode -> m (Maybe (LockFileHandle m))
forall (m :: * -> *) h.
HasBlockIO m h
-> FsPath -> LockMode -> m (Maybe (LockFileHandle m))
FS.tryLockFile HasBlockIO m h
hbio FsPath
lockFilePath LockMode
FS.ExclusiveLock
releaseLock :: LockFileHandle m -> m ()
releaseLock = LockFileHandle m -> m ()
forall {m :: * -> *}. LockFileHandle m -> m ()
FS.hUnlock
mkSession :: LockFileHandle m -> m (Session m h)
mkSession LockFileHandle m
lockFile = do
UniqCounter m
counterVar <- Int -> m (UniqCounter m)
forall (m :: * -> *). PrimMonad m => Int -> m (UniqCounter m)
newUniqCounter Int
0
StrictMVar m (Map TableId (Table m h))
openTablesVar <- Map TableId (Table m h)
-> m (StrictMVar m (Map TableId (Table m h)))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar Map TableId (Table m h)
forall k a. Map k a
Map.empty
StrictMVar m (Map CursorId (Cursor m h))
openCursorsVar <- Map CursorId (Cursor m h)
-> m (StrictMVar m (Map CursorId (Cursor m h)))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar Map CursorId (Cursor m h)
forall k a. Map k a
Map.empty
RWVar m (SessionState m h)
sessionVar <- SessionState m h -> m (RWVar m (SessionState m h))
forall (m :: * -> *) a. MonadSTM m => a -> m (RWVar m a)
RW.new (SessionState m h -> m (RWVar m (SessionState m h)))
-> SessionState m h -> m (RWVar m (SessionState m h))
forall a b. (a -> b) -> a -> b
$ SessionEnv m h -> SessionState m h
forall (m :: * -> *) h. SessionEnv m h -> SessionState m h
SessionOpen (SessionEnv m h -> SessionState m h)
-> SessionEnv m h -> SessionState m h
forall a b. (a -> b) -> a -> b
$ SessionEnv {
sessionRoot :: SessionRoot
sessionRoot = SessionRoot
root
, sessionHasFS :: HasFS m h
sessionHasFS = HasFS m h
hfs
, sessionHasBlockIO :: HasBlockIO m h
sessionHasBlockIO = HasBlockIO m h
hbio
, sessionLockFile :: LockFileHandle m
sessionLockFile = LockFileHandle m
lockFile
, sessionUniqCounter :: UniqCounter m
sessionUniqCounter = UniqCounter m
counterVar
, sessionOpenTables :: StrictMVar m (Map TableId (Table m h))
sessionOpenTables = StrictMVar m (Map TableId (Table m h))
openTablesVar
, sessionOpenCursors :: StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors = StrictMVar m (Map CursorId (Cursor m h))
openCursorsVar
}
Session m h -> m (Session m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Session m h -> m (Session m h)) -> Session m h -> m (Session m h)
forall a b. (a -> b) -> a -> b
$! RWVar m (SessionState m h) -> Tracer m LSMTreeTrace -> Session m h
forall (m :: * -> *) h.
RWVar m (SessionState m h) -> Tracer m LSMTreeTrace -> Session m h
Session RWVar m (SessionState m h)
sessionVar Tracer m LSMTreeTrace
tr
newSession :: ActionRegistry m -> LockFileHandle m -> m (Session m h)
newSession ActionRegistry m
reg LockFileHandle m
sessionFileLock = do
Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m LSMTreeTrace
tr LSMTreeTrace
TraceNewSession
ActionRegistry m -> m () -> m () -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> m () -> m a
withRollback_ ActionRegistry m
reg
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.createDirectory HasFS m h
hfs FsPath
activeDirPath)
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs FsPath
activeDirPath)
ActionRegistry m -> m () -> m () -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> m () -> m a
withRollback_ ActionRegistry m
reg
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.createDirectory HasFS m h
hfs FsPath
snapshotsDirPath)
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs FsPath
snapshotsDirPath)
LockFileHandle m -> m (Session m h)
mkSession LockFileHandle m
sessionFileLock
restoreSession :: ActionRegistry m -> LockFileHandle m -> m (Session m h)
restoreSession ActionRegistry m
_reg LockFileHandle m
sessionFileLock = do
Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m LSMTreeTrace
tr LSMTreeTrace
TraceRestoreSession
m ()
checkTopLevelDirLayout
HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs FsPath
activeDirPath
m () -> m () -> m ()
forall a b. m a -> m b -> m a
forall (m :: * -> *) a b. MonadThrow m => m a -> m b -> m a
`finally` HasFS m h -> HasCallStack => Bool -> FsPath -> m ()
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Bool -> FsPath -> m ()
FS.createDirectoryIfMissing HasFS m h
hfs Bool
False FsPath
activeDirPath
m ()
checkActiveDirLayout
m ()
checkSnapshotsDirLayout
LockFileHandle m -> m (Session m h)
mkSession LockFileHandle m
sessionFileLock
checkTopLevelDirLayout :: m ()
checkTopLevelDirLayout = do
HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
activeDirPath m Bool -> (Bool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
b ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
activeDirPath))
HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
snapshotsDirPath m Bool -> (Bool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
b ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
snapshotsDirPath))
checkActiveDirLayout :: m ()
checkActiveDirLayout = do
Set String
contents <- HasFS m h -> HasCallStack => FsPath -> m (Set String)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m (Set String)
FS.listDirectory HasFS m h
hfs FsPath
activeDirPath
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Set String -> Bool
forall a. Set a -> Bool
Set.null Set String
contents) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
activeDirPath))
checkSnapshotsDirLayout :: m ()
checkSnapshotsDirLayout = () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
{-# SPECIALISE closeSession :: Session IO h -> IO () #-}
closeSession ::
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
=> Session m h
-> m ()
closeSession :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> m ()
closeSession Session{RWVar m (SessionState m h)
sessionState :: forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState :: RWVar m (SessionState m h)
sessionState, Tracer m LSMTreeTrace
sessionTracer :: forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer :: Tracer m LSMTreeTrace
sessionTracer} = do
Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m LSMTreeTrace
sessionTracer LSMTreeTrace
TraceCloseSession
m (SessionState m h)
-> (SessionState m h -> m ())
-> (ActionRegistry m -> SessionState m h -> m (SessionState m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_
(RWVar m (SessionState m h) -> m (SessionState m h)
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> m a
RW.unsafeAcquireWriteAccess RWVar m (SessionState m h)
sessionState)
(STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (SessionState m h -> STM m ()) -> SessionState m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RWVar m (SessionState m h) -> SessionState m h -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> a -> STM m ()
RW.unsafeReleaseWriteAccess RWVar m (SessionState m h)
sessionState)
((ActionRegistry m -> SessionState m h -> m (SessionState m h))
-> m ())
-> (ActionRegistry m -> SessionState m h -> m (SessionState m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> \case
SessionState m h
SessionClosed -> SessionState m h -> m (SessionState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SessionState m h
forall (m :: * -> *) h. SessionState m h
SessionClosed
SessionOpen SessionEnv m h
seshEnv -> do
Map CursorId (Cursor m h)
cursors <-
ActionRegistry m
-> m (Map CursorId (Cursor m h))
-> (Map CursorId (Cursor m h) -> m ())
-> m (Map CursorId (Cursor m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(StrictMVar m (Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
seshEnv) Map CursorId (Cursor m h)
forall k a. Map k a
Map.empty)
(m (Map CursorId (Cursor m h)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Map CursorId (Cursor m h)) -> m ())
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> Map CursorId (Cursor m h)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar m (Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
seshEnv))
(Cursor m h -> m ()) -> Map CursorId (Cursor m h) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Cursor m h -> m ()) -> Cursor m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Cursor m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) =>
Cursor m h -> m ()
closeCursor) Map CursorId (Cursor m h)
cursors
Map TableId (Table m h)
tables <-
ActionRegistry m
-> m (Map TableId (Table m h))
-> (Map TableId (Table m h) -> m ())
-> m (Map TableId (Table m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(StrictMVar m (Map TableId (Table m h))
-> Map TableId (Table m h) -> m (Map TableId (Table m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables SessionEnv m h
seshEnv) Map TableId (Table m h)
forall k a. Map k a
Map.empty)
(m (Map TableId (Table m h)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Map TableId (Table m h)) -> m ())
-> (Map TableId (Table m h) -> m (Map TableId (Table m h)))
-> Map TableId (Table m h)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar m (Map TableId (Table m h))
-> Map TableId (Table m h) -> m (Map TableId (Table m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables SessionEnv m h
seshEnv))
(Table m h -> m ()) -> Map TableId (Table m h) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Table m h -> m ()) -> Table m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Table m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Table m h -> m ()
close) Map TableId (Table m h)
tables
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ HasBlockIO m h -> HasCallStack => m ()
forall (m :: * -> *) h. HasBlockIO m h -> HasCallStack => m ()
FS.close (SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO SessionEnv m h
seshEnv)
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ LockFileHandle m -> m ()
forall {m :: * -> *}. LockFileHandle m -> m ()
FS.hUnlock (SessionEnv m h -> LockFileHandle m
forall (m :: * -> *) h. SessionEnv m h -> LockFileHandle m
sessionLockFile SessionEnv m h
seshEnv)
SessionState m h -> m (SessionState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SessionState m h
forall (m :: * -> *) h. SessionState m h
SessionClosed
data Table m h = Table {
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig :: !TableConfig
, forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState :: !(RWVar m (TableState m h))
, forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager :: !(ArenaManager (PrimState m))
, forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer :: !(Tracer m TableTrace)
, forall (m :: * -> *) h. Table m h -> TableId
tableId :: !TableId
, forall (m :: * -> *) h. Table m h -> Session m h
tableSession :: !(Session m h)
}
instance NFData (Table m h) where
rnf :: Table m h -> ()
rnf (Table TableConfig
a RWVar m (TableState m h)
b ArenaManager (PrimState m)
c Tracer m TableTrace
d TableId
e Session m h
f) =
TableConfig -> ()
forall a. NFData a => a -> ()
rnf TableConfig
a () -> () -> ()
forall a b. a -> b -> b
`seq` RWVar m (TableState m h) -> ()
forall a. NFData a => a -> ()
rnf RWVar m (TableState m h)
b () -> () -> ()
forall a b. a -> b -> b
`seq` ArenaManager (PrimState m) -> ()
forall a. NFData a => a -> ()
rnf ArenaManager (PrimState m)
c () -> () -> ()
forall a b. a -> b -> b
`seq` Tracer m TableTrace -> ()
forall a. a -> ()
rwhnf Tracer m TableTrace
d () -> () -> ()
forall a b. a -> b -> b
`seq` TableId -> ()
forall a. NFData a => a -> ()
rnf TableId
e() -> () -> ()
forall a b. a -> b -> b
`seq` Session m h -> ()
forall a. a -> ()
rwhnf Session m h
f
data TableState m h =
TableOpen !(TableEnv m h)
| TableClosed
data TableEnv m h = TableEnv {
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv :: !(SessionEnv m h)
, forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent :: !(RWVar m (TableContent m h))
}
{-# INLINE tableSessionRoot #-}
tableSessionRoot :: TableEnv m h -> SessionRoot
tableSessionRoot :: forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot = SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot (SessionEnv m h -> SessionRoot)
-> (TableEnv m h -> SessionEnv m h) -> TableEnv m h -> SessionRoot
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv
{-# INLINE tableHasFS #-}
tableHasFS :: TableEnv m h -> HasFS m h
tableHasFS :: forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS = SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS (SessionEnv m h -> HasFS m h)
-> (TableEnv m h -> SessionEnv m h) -> TableEnv m h -> HasFS m h
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv
{-# INLINE tableHasBlockIO #-}
tableHasBlockIO :: TableEnv m h -> HasBlockIO m h
tableHasBlockIO :: forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO = SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO (SessionEnv m h -> HasBlockIO m h)
-> (TableEnv m h -> SessionEnv m h)
-> TableEnv m h
-> HasBlockIO m h
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv
{-# INLINE tableSessionUniqCounter #-}
tableSessionUniqCounter :: TableEnv m h -> UniqCounter m
tableSessionUniqCounter :: forall (m :: * -> *) h. TableEnv m h -> UniqCounter m
tableSessionUniqCounter = SessionEnv m h -> UniqCounter m
forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter (SessionEnv m h -> UniqCounter m)
-> (TableEnv m h -> SessionEnv m h)
-> TableEnv m h
-> UniqCounter m
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv
{-# INLINE tableSessionUntrackTable #-}
{-# SPECIALISE tableSessionUntrackTable :: TableId -> TableEnv IO h -> IO () #-}
tableSessionUntrackTable :: MonadMVar m => TableId -> TableEnv m h -> m ()
tableSessionUntrackTable :: forall (m :: * -> *) h.
MonadMVar m =>
TableId -> TableEnv m h -> m ()
tableSessionUntrackTable TableId
tableId TableEnv m h
tEnv =
StrictMVar m (Map TableId (Table m h))
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables (TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv TableEnv m h
tEnv)) ((Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ())
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall a b. (a -> b) -> a -> b
$ Map TableId (Table m h) -> m (Map TableId (Table m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map TableId (Table m h) -> m (Map TableId (Table m h)))
-> (Map TableId (Table m h) -> Map TableId (Table m h))
-> Map TableId (Table m h)
-> m (Map TableId (Table m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableId -> Map TableId (Table m h) -> Map TableId (Table m h)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete TableId
tableId
data TableClosedError
= ErrTableClosed
deriving stock (Int -> TableClosedError -> ShowS
[TableClosedError] -> ShowS
TableClosedError -> String
(Int -> TableClosedError -> ShowS)
-> (TableClosedError -> String)
-> ([TableClosedError] -> ShowS)
-> Show TableClosedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TableClosedError -> ShowS
showsPrec :: Int -> TableClosedError -> ShowS
$cshow :: TableClosedError -> String
show :: TableClosedError -> String
$cshowList :: [TableClosedError] -> ShowS
showList :: [TableClosedError] -> ShowS
Show, TableClosedError -> TableClosedError -> Bool
(TableClosedError -> TableClosedError -> Bool)
-> (TableClosedError -> TableClosedError -> Bool)
-> Eq TableClosedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TableClosedError -> TableClosedError -> Bool
== :: TableClosedError -> TableClosedError -> Bool
$c/= :: TableClosedError -> TableClosedError -> Bool
/= :: TableClosedError -> TableClosedError -> Bool
Eq)
deriving anyclass (Show TableClosedError
Typeable TableClosedError
(Typeable TableClosedError, Show TableClosedError) =>
(TableClosedError -> SomeException)
-> (SomeException -> Maybe TableClosedError)
-> (TableClosedError -> String)
-> Exception TableClosedError
SomeException -> Maybe TableClosedError
TableClosedError -> String
TableClosedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: TableClosedError -> SomeException
toException :: TableClosedError -> SomeException
$cfromException :: SomeException -> Maybe TableClosedError
fromException :: SomeException -> Maybe TableClosedError
$cdisplayException :: TableClosedError -> String
displayException :: TableClosedError -> String
Exception)
{-# INLINE withOpenTable #-}
{-# SPECIALISE withOpenTable ::
Table IO h
-> (TableEnv IO h -> IO a)
-> IO a #-}
withOpenTable ::
(MonadSTM m, MonadThrow m)
=> Table m h
-> (TableEnv m h -> m a)
-> m a
withOpenTable :: forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t TableEnv m h -> m a
action = RWVar m (TableState m h) -> (TableState m h -> m a) -> m a
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (Table m h -> RWVar m (TableState m h)
forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState Table m h
t) ((TableState m h -> m a) -> m a) -> (TableState m h -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \case
TableState m h
TableClosed -> TableClosedError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TableClosedError
ErrTableClosed
TableOpen TableEnv m h
tEnv -> TableEnv m h -> m a
action TableEnv m h
tEnv
{-# SPECIALISE withTable ::
Session IO h
-> TableConfig
-> (Table IO h -> IO a)
-> IO a #-}
withTable ::
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
=> Session m h
-> TableConfig
-> (Table m h -> m a)
-> m a
withTable :: forall (m :: * -> *) h a.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> TableConfig -> (Table m h -> m a) -> m a
withTable Session m h
sesh TableConfig
conf = m (Table m h) -> (Table m h -> m ()) -> (Table m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (Session m h -> TableConfig -> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Session m h -> TableConfig -> m (Table m h)
new Session m h
sesh TableConfig
conf) Table m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Table m h -> m ()
close
{-# SPECIALISE new ::
Session IO h
-> TableConfig
-> IO (Table IO h) #-}
new ::
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m)
=> Session m h
-> TableConfig
-> m (Table m h)
new :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Session m h -> TableConfig -> m (Table m h)
new Session m h
sesh TableConfig
conf = do
Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh) LSMTreeTrace
TraceNewTable
Session m h -> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m (Table m h)) -> m (Table m h))
-> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv ->
(ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Table m h)) -> m (Table m h))
-> (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
ArenaManager (PrimState m)
am <- m (ArenaManager (PrimState m))
forall (m :: * -> *). PrimMonad m => m (ArenaManager (PrimState m))
newArenaManager
TableContent m h
tc <- SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m, MonadMVar m) =>
SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
newEmptyTableContent SessionEnv m h
seshEnv ActionRegistry m
reg
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf ArenaManager (PrimState m)
am TableContent m h
tc
{-# SPECIALISE newEmptyTableContent ::
SessionEnv IO h
-> ActionRegistry IO
-> IO (TableContent IO h) #-}
newEmptyTableContent ::
(PrimMonad m, MonadMask m, MonadMVar m)
=> SessionEnv m h
-> ActionRegistry m
-> m (TableContent m h)
newEmptyTableContent :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m, MonadMVar m) =>
SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
newEmptyTableContent SessionEnv m h
seshEnv ActionRegistry m
reg = do
FsPath
blobpath <- SessionRoot -> Unique -> FsPath
Paths.tableBlobPath (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) (Unique -> FsPath) -> m Unique -> m FsPath
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (SessionEnv m h -> UniqCounter m
forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter SessionEnv m h
seshEnv)
let tableWriteBuffer :: WriteBuffer
tableWriteBuffer = WriteBuffer
WB.empty
Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
<- ActionRegistry m
-> m (Ref (WriteBufferBlobs m h))
-> (Ref (WriteBufferBlobs m h) -> m ())
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
WBB.new (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) FsPath
blobpath)
Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
let tableLevels :: Vector a
tableLevels = Vector a
forall a. Vector a
V.empty
LevelsCache m h
tableCache <- ActionRegistry m -> Levels m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
mkLevelsCache ActionRegistry m
reg Levels m h
forall a. Vector a
tableLevels
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent {
WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer
, Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
, Levels m h
forall a. Vector a
tableLevels :: forall a. Vector a
tableLevels :: Levels m h
tableLevels
, LevelsCache m h
tableCache :: LevelsCache m h
tableCache :: LevelsCache m h
tableCache
, tableUnionLevel :: UnionLevel m h
tableUnionLevel = UnionLevel m h
forall (m :: * -> *) h. UnionLevel m h
NoUnion
}
{-# SPECIALISE newWith ::
ActionRegistry IO
-> Session IO h
-> SessionEnv IO h
-> TableConfig
-> ArenaManager RealWorld
-> TableContent IO h
-> IO (Table IO h) #-}
newWith ::
(MonadSTM m, MonadMVar m, PrimMonad m)
=> ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf !ArenaManager (PrimState m)
am !TableContent m h
tc = do
TableId
tableId <- Unique -> TableId
uniqueToTableId (Unique -> TableId) -> m Unique -> m TableId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (SessionEnv m h -> UniqCounter m
forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter SessionEnv m h
seshEnv)
let tr :: Tracer m TableTrace
tr = TableId -> TableTrace -> LSMTreeTrace
TraceTable TableId
tableId (TableTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m TableTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m TableTrace
tr (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ TableConfig -> TableTrace
TraceCreateTable TableConfig
conf
RWVar m (TableContent m h)
contentVar <- TableContent m h -> m (RWVar m (TableContent m h))
forall (m :: * -> *) a. MonadSTM m => a -> m (RWVar m a)
RW.new (TableContent m h -> m (RWVar m (TableContent m h)))
-> TableContent m h -> m (RWVar m (TableContent m h))
forall a b. (a -> b) -> a -> b
$ TableContent m h
tc
RWVar m (TableState m h)
tableVar <- TableState m h -> m (RWVar m (TableState m h))
forall (m :: * -> *) a. MonadSTM m => a -> m (RWVar m a)
RW.new (TableState m h -> m (RWVar m (TableState m h)))
-> TableState m h -> m (RWVar m (TableState m h))
forall a b. (a -> b) -> a -> b
$ TableEnv m h -> TableState m h
forall (m :: * -> *) h. TableEnv m h -> TableState m h
TableOpen (TableEnv m h -> TableState m h) -> TableEnv m h -> TableState m h
forall a b. (a -> b) -> a -> b
$ TableEnv {
tableSessionEnv :: SessionEnv m h
tableSessionEnv = SessionEnv m h
seshEnv
, tableContent :: RWVar m (TableContent m h)
tableContent = RWVar m (TableContent m h)
contentVar
}
let !t :: Table m h
t = TableConfig
-> RWVar m (TableState m h)
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> Session m h
-> Table m h
forall (m :: * -> *) h.
TableConfig
-> RWVar m (TableState m h)
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> Session m h
-> Table m h
Table TableConfig
conf RWVar m (TableState m h)
tableVar ArenaManager (PrimState m)
am Tracer m TableTrace
tr TableId
tableId Session m h
sesh
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
StrictMVar m (Map TableId (Table m h))
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables SessionEnv m h
seshEnv) ((Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ())
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall a b. (a -> b) -> a -> b
$
Map TableId (Table m h) -> m (Map TableId (Table m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map TableId (Table m h) -> m (Map TableId (Table m h)))
-> (Map TableId (Table m h) -> Map TableId (Table m h))
-> Map TableId (Table m h)
-> m (Map TableId (Table m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableId
-> Table m h -> Map TableId (Table m h) -> Map TableId (Table m h)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert TableId
tableId Table m h
t
Table m h -> m (Table m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Table m h -> m (Table m h)) -> Table m h -> m (Table m h)
forall a b. (a -> b) -> a -> b
$! Table m h
t
{-# SPECIALISE close :: Table IO h -> IO () #-}
close ::
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
=> Table m h
-> m ()
close :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Table m h -> m ()
close Table m h
t = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) TableTrace
TraceCloseTable
m (TableState m h)
-> (TableState m h -> m ())
-> (ActionRegistry m -> TableState m h -> m (TableState m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_
(RWVar m (TableState m h) -> m (TableState m h)
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> m a
RW.unsafeAcquireWriteAccess (Table m h -> RWVar m (TableState m h)
forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState Table m h
t))
(STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (TableState m h -> STM m ()) -> TableState m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RWVar m (TableState m h) -> TableState m h -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> a -> STM m ()
RW.unsafeReleaseWriteAccess (Table m h -> RWVar m (TableState m h)
forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState Table m h
t)) ((ActionRegistry m -> TableState m h -> m (TableState m h))
-> m ())
-> (ActionRegistry m -> TableState m h -> m (TableState m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> \case
TableState m h
TableClosed -> TableState m h -> m (TableState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableState m h
forall (m :: * -> *) h. TableState m h
TableClosed
TableOpen TableEnv m h
tEnv -> do
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (TableId -> TableEnv m h -> m ()
forall (m :: * -> *) h.
MonadMVar m =>
TableId -> TableEnv m h -> m ()
tableSessionUntrackTable (Table m h -> TableId
forall (m :: * -> *) h. Table m h -> TableId
tableId Table m h
t) TableEnv m h
tEnv)
RWVar m (TableContent m h)
-> (TableContent m h -> m (TableContent m h)) -> m ()
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> (a -> m a) -> m ()
RW.withWriteAccess_ (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m (TableContent m h)) -> m ())
-> (TableContent m h -> m (TableContent m h)) -> m ()
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tc -> do
ActionRegistry m -> TableContent m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m ()
releaseTableContent ActionRegistry m
reg TableContent m h
tc
TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent m h
tc
TableState m h -> m (TableState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableState m h
forall (m :: * -> *) h. TableState m h
TableClosed
{-# SPECIALISE lookups ::
ResolveSerialisedValue
-> V.Vector SerialisedKey
-> Table IO h
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-}
lookups ::
(MonadAsync m, MonadMask m, MonadMVar m, MonadST m)
=> ResolveSerialisedValue
-> V.Vector SerialisedKey
-> Table m h
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookups :: forall (m :: * -> *) h.
(MonadAsync m, MonadMask m, MonadMVar m, MonadST m) =>
ResolveSerialisedValue
-> Vector SerialisedKey
-> Table m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookups ResolveSerialisedValue
resolve Vector SerialisedKey
ks Table m h
t = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> TableTrace
TraceLookups (Vector SerialisedKey -> Int
forall a. Vector a -> Int
V.length Vector SerialisedKey
ks)
Table m h
-> (TableEnv m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> (TableEnv m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv ->
RWVar m (TableContent m h)
-> (TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> (TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tableContent -> do
case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tableContent of
UnionLevel m h
NoUnion -> TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularLevelLookups TableEnv m h
tEnv TableContent m h
tableContent
Union Ref (MergingTree m h)
tree -> do
Ref (MergingTree m h) -> m Bool
forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
isStructurallyEmpty Ref (MergingTree m h)
tree m Bool
-> (Bool
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True -> TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularLevelLookups TableEnv m h
tEnv TableContent m h
tableContent
Bool
False ->
(ActionRegistry m
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> (ActionRegistry m
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
Async m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularResult <-
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. m a -> m (Async m a)
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
Async.async (m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a b. (a -> b) -> a -> b
$ TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularLevelLookups TableEnv m h
tEnv TableContent m h
tableContent
LookupTree (Vector (Ref (Run m h)))
treeBatches <- ActionRegistry m
-> Ref (MergingTree m h) -> m (LookupTree (Vector (Ref (Run m h))))
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m
-> Ref (MergingTree m h) -> m (LookupTree (Vector (Ref (Run m h))))
MT.buildLookupTree ActionRegistry m
reg Ref (MergingTree m h)
tree
LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
treeResults <- LookupTree (Vector (Ref (Run m h)))
-> (Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM LookupTree (Vector (Ref (Run m h)))
treeBatches ((Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))))
-> (Vector (Ref (Run m h))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall a b. (a -> b) -> a -> b
$ \Vector (Ref (Run m h))
runs ->
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. m a -> m (Async m a)
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
Async.async (m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a b. (a -> b) -> a -> b
$ TableEnv m h
-> Vector (Ref (Run m h))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
treeBatchLookups TableEnv m h
tEnv Vector (Ref (Run m h))
runs
Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res <- ResolveSerialisedValue
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h.
MonadAsync m =>
ResolveSerialisedValue
-> LookupTree (Async m (LookupAcc m h)) -> m (LookupAcc m h)
MT.foldLookupTree ResolveSerialisedValue
resolve (LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$
TreeMergeType
-> Vector
(LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. TreeMergeType -> Vector (LookupTree a) -> LookupTree a
MT.mkLookupNode TreeMergeType
MR.MergeLevel (Vector
(LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> Vector
(LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a b. (a -> b) -> a -> b
$ [LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))]
-> Vector
(LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall a. [a] -> Vector a
V.fromList
[ Async m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. a -> LookupTree a
MT.LookupBatch Async m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularResult
, LookupTree
(Async
m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
treeResults
]
ActionRegistry m -> LookupTree (Vector (Ref (Run m h))) -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LookupTree (Vector (Ref (Run m h))) -> m ()
MT.releaseLookupTree ActionRegistry m
reg LookupTree (Vector (Ref (Run m h)))
treeBatches
Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res
where
regularLevelLookups :: TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularLevelLookups TableEnv m h
tEnv TableContent m h
tableContent = do
let !cache :: LevelsCache m h
cache = TableContent m h -> LevelsCache m h
forall (m :: * -> *) h. TableContent m h -> LevelsCache m h
tableCache TableContent m h
tableContent
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h.
(MonadThrow m, MonadST m) =>
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (LookupAcc m h)
lookupsIOWithWriteBuffer
(TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
(Table m h -> ArenaManager (PrimState m)
forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager Table m h
t)
ResolveSerialisedValue
resolve
(TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tableContent)
(TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
tableContent)
(LevelsCache m h -> Vector (Ref (Run m h))
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns LevelsCache m h
cache)
(LevelsCache m h -> Vector (Bloom SerialisedKey)
forall (m :: * -> *) h.
LevelsCache m h -> Vector (Bloom SerialisedKey)
cachedFilters LevelsCache m h
cache)
(LevelsCache m h -> Vector Index
forall (m :: * -> *) h. LevelsCache m h -> Vector Index
cachedIndexes LevelsCache m h
cache)
(LevelsCache m h -> Vector (Handle h)
forall (m :: * -> *) h. LevelsCache m h -> Vector (Handle h)
cachedKOpsFiles LevelsCache m h
cache)
Vector SerialisedKey
ks
treeBatchLookups :: TableEnv m h
-> Vector (Ref (Run m h))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
treeBatchLookups TableEnv m h
tEnv Vector (Ref (Run m h))
runs =
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h.
(MonadThrow m, MonadST m) =>
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (LookupAcc m h)
lookupsIO
(TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
(Table m h -> ArenaManager (PrimState m)
forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager Table m h
t)
ResolveSerialisedValue
resolve
Vector (Ref (Run m h))
runs
((Ref (Run m h) -> Bloom SerialisedKey)
-> Vector (Ref (Run m h)) -> Vector (Bloom SerialisedKey)
forall a b. (a -> b) -> Vector a -> Vector b
V.mapStrict (\(DeRef Run m h
r) -> Run m h -> Bloom SerialisedKey
forall (m :: * -> *) h. Run m h -> Bloom SerialisedKey
Run.runFilter Run m h
r) Vector (Ref (Run m h))
runs)
((Ref (Run m h) -> Index) -> Vector (Ref (Run m h)) -> Vector Index
forall a b. (a -> b) -> Vector a -> Vector b
V.mapStrict (\(DeRef Run m h
r) -> Run m h -> Index
forall (m :: * -> *) h. Run m h -> Index
Run.runIndex Run m h
r) Vector (Ref (Run m h))
runs)
((Ref (Run m h) -> Handle h)
-> Vector (Ref (Run m h)) -> Vector (Handle h)
forall a b. (a -> b) -> Vector a -> Vector b
V.mapStrict (\(DeRef Run m h
r) -> Run m h -> Handle h
forall (m :: * -> *) h. Run m h -> Handle h
Run.runKOpsFile Run m h
r) Vector (Ref (Run m h))
runs)
Vector SerialisedKey
ks
{-# SPECIALISE rangeLookup ::
ResolveSerialisedValue
-> Range SerialisedKey
-> Table IO h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
-> IO (V.Vector res) #-}
rangeLookup ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> Range SerialisedKey
-> Table m h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (V.Vector res)
rangeLookup :: forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> Range SerialisedKey
-> Table m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
rangeLookup ResolveSerialisedValue
resolve Range SerialisedKey
range Table m h
t SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Range SerialisedKey -> TableTrace
TraceRangeLookup Range SerialisedKey
range
case Range SerialisedKey
range of
FromToExcluding SerialisedKey
lb SerialisedKey
ub ->
OffsetKey
-> Table m h -> (Cursor m h -> m (Vector res)) -> m (Vector res)
forall (m :: * -> *) h a.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
OffsetKey -> Table m h -> (Cursor m h -> m a) -> m a
withCursor (SerialisedKey -> OffsetKey
OffsetKey SerialisedKey
lb) Table m h
t ((Cursor m h -> m (Vector res)) -> m (Vector res))
-> (Cursor m h -> m (Vector res)) -> m (Vector res)
forall a b. (a -> b) -> a -> b
$ \Cursor m h
cursor ->
Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor (SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
< SerialisedKey
ub) []
FromToIncluding SerialisedKey
lb SerialisedKey
ub ->
OffsetKey
-> Table m h -> (Cursor m h -> m (Vector res)) -> m (Vector res)
forall (m :: * -> *) h a.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
OffsetKey -> Table m h -> (Cursor m h -> m a) -> m a
withCursor (SerialisedKey -> OffsetKey
OffsetKey SerialisedKey
lb) Table m h
t ((Cursor m h -> m (Vector res)) -> m (Vector res))
-> (Cursor m h -> m (Vector res)) -> m (Vector res)
forall a b. (a -> b) -> a -> b
$ \Cursor m h
cursor ->
Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor (SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
<= SerialisedKey
ub) []
where
chunkSize :: Int
chunkSize = Int
500
go :: Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor SerialisedKey -> Bool
isInUpperBound ![Vector res]
chunks = do
Vector res
chunk <- ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursorWhile ResolveSerialisedValue
resolve SerialisedKey -> Bool
isInUpperBound Int
chunkSize Cursor m h
cursor SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry
let !n :: Int
n = Vector res -> Int
forall a. Vector a -> Int
V.length Vector res
chunk
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
chunkSize
then Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor SerialisedKey -> Bool
isInUpperBound (Vector res
chunk Vector res -> [Vector res] -> [Vector res]
forall a. a -> [a] -> [a]
: [Vector res]
chunks)
else Vector res -> m (Vector res)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Vector res] -> Vector res
forall a. [Vector a] -> Vector a
V.concat ([Vector res] -> [Vector res]
forall a. [a] -> [a]
reverse (Int -> Int -> Vector res -> Vector res
forall a. Int -> Int -> Vector a -> Vector a
V.slice Int
0 Int
n Vector res
chunk Vector res -> [Vector res] -> [Vector res]
forall a. a -> [a] -> [a]
: [Vector res]
chunks)))
{-# SPECIALISE updates ::
ResolveSerialisedValue
-> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Table IO h
-> IO () #-}
updates ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Table m h
-> m ()
updates :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Table m h
-> m ()
updates ResolveSerialisedValue
resolve Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es Table m h
t = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> TableTrace
TraceUpdates (Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> Int
forall a. Vector a -> Int
V.length Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es)
let conf :: TableConfig
conf = Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig Table m h
t
Table m h -> (TableEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m ()) -> m ()) -> (TableEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
let hfs :: HasFS m h
hfs = TableEnv m h -> HasFS m h
forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS TableEnv m h
tEnv
m (TableContent m h)
-> (TableContent m h -> m ())
-> (ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_
(RWVar m (TableContent m h) -> m (TableContent m h)
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> m a
RW.unsafeAcquireWriteAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv))
(STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (TableContent m h -> STM m ()) -> TableContent m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RWVar m (TableContent m h) -> TableContent m h -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> a -> STM m ()
RW.unsafeReleaseWriteAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv)) ((ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ())
-> (ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
updatesWithInterleavedFlushes
(AtLevel MergeTrace -> TableTrace
TraceMerge (AtLevel MergeTrace -> TableTrace)
-> Tracer m TableTrace -> Tracer m (AtLevel MergeTrace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t)
TableConfig
conf
ResolveSerialisedValue
resolve
HasFS m h
hfs
(TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
(TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv)
(TableEnv m h -> UniqCounter m
forall (m :: * -> *) h. TableEnv m h -> UniqCounter m
tableSessionUniqCounter TableEnv m h
tEnv)
Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es
ActionRegistry m
reg
data BlobRefInvalidError
=
ErrBlobRefInvalid !Int
deriving stock (Int -> BlobRefInvalidError -> ShowS
[BlobRefInvalidError] -> ShowS
BlobRefInvalidError -> String
(Int -> BlobRefInvalidError -> ShowS)
-> (BlobRefInvalidError -> String)
-> ([BlobRefInvalidError] -> ShowS)
-> Show BlobRefInvalidError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BlobRefInvalidError -> ShowS
showsPrec :: Int -> BlobRefInvalidError -> ShowS
$cshow :: BlobRefInvalidError -> String
show :: BlobRefInvalidError -> String
$cshowList :: [BlobRefInvalidError] -> ShowS
showList :: [BlobRefInvalidError] -> ShowS
Show, BlobRefInvalidError -> BlobRefInvalidError -> Bool
(BlobRefInvalidError -> BlobRefInvalidError -> Bool)
-> (BlobRefInvalidError -> BlobRefInvalidError -> Bool)
-> Eq BlobRefInvalidError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
== :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
$c/= :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
/= :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
Eq)
deriving anyclass (Show BlobRefInvalidError
Typeable BlobRefInvalidError
(Typeable BlobRefInvalidError, Show BlobRefInvalidError) =>
(BlobRefInvalidError -> SomeException)
-> (SomeException -> Maybe BlobRefInvalidError)
-> (BlobRefInvalidError -> String)
-> Exception BlobRefInvalidError
SomeException -> Maybe BlobRefInvalidError
BlobRefInvalidError -> String
BlobRefInvalidError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: BlobRefInvalidError -> SomeException
toException :: BlobRefInvalidError -> SomeException
$cfromException :: SomeException -> Maybe BlobRefInvalidError
fromException :: SomeException -> Maybe BlobRefInvalidError
$cdisplayException :: BlobRefInvalidError -> String
displayException :: BlobRefInvalidError -> String
Exception)
{-# SPECIALISE retrieveBlobs ::
Session IO h
-> V.Vector (WeakBlobRef IO h)
-> IO (V.Vector SerialisedBlob) #-}
retrieveBlobs ::
(MonadMask m, MonadST m, MonadSTM m)
=> Session m h
-> V.Vector (WeakBlobRef m h)
-> m (V.Vector SerialisedBlob)
retrieveBlobs :: forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
Session m h
-> Vector (WeakBlobRef m h) -> m (Vector SerialisedBlob)
retrieveBlobs Session m h
sesh Vector (WeakBlobRef m h)
wrefs =
Session m h
-> (SessionEnv m h -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob))
-> (SessionEnv m h -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv ->
let hbio :: HasBlockIO m h
hbio = SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO SessionEnv m h
seshEnv in
(WeakBlobRefInvalid -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob) -> m (Vector SerialisedBlob)
forall e a. Exception e => (e -> m a) -> m a -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (\(BlobRef.WeakBlobRefInvalid Int
i) ->
BlobRefInvalidError -> m (Vector SerialisedBlob)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (Int -> BlobRefInvalidError
ErrBlobRefInvalid Int
i)) (m (Vector SerialisedBlob) -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob) -> m (Vector SerialisedBlob)
forall a b. (a -> b) -> a -> b
$
HasBlockIO m h
-> Vector (WeakBlobRef m h) -> m (Vector SerialisedBlob)
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
HasBlockIO m h
-> Vector (WeakBlobRef m h) -> m (Vector SerialisedBlob)
BlobRef.readWeakBlobRefs HasBlockIO m h
hbio Vector (WeakBlobRef m h)
wrefs
data Cursor m h = Cursor {
forall (m :: * -> *) h.
Cursor m h -> StrictMVar m (CursorState m h)
cursorState :: !(StrictMVar m (CursorState m h))
, forall (m :: * -> *) h. Cursor m h -> Tracer m CursorTrace
cursorTracer :: !(Tracer m CursorTrace)
}
instance NFData (Cursor m h) where
rnf :: Cursor m h -> ()
rnf (Cursor StrictMVar m (CursorState m h)
a Tracer m CursorTrace
b) = StrictMVar m (CursorState m h) -> ()
forall a. a -> ()
rwhnf StrictMVar m (CursorState m h)
a () -> () -> ()
forall a b. a -> b -> b
`seq` Tracer m CursorTrace -> ()
forall a. a -> ()
rwhnf Tracer m CursorTrace
b
data CursorState m h =
CursorOpen !(CursorEnv m h)
| CursorClosed
data CursorEnv m h = CursorEnv {
forall (m :: * -> *) h. CursorEnv m h -> Session m h
cursorSession :: !(Session m h)
, forall (m :: * -> *) h. CursorEnv m h -> SessionEnv m h
cursorSessionEnv :: !(SessionEnv m h)
, forall (m :: * -> *) h. CursorEnv m h -> CursorId
cursorId :: !CursorId
, forall (m :: * -> *) h. CursorEnv m h -> Maybe (Readers m h)
cursorReaders :: !(Maybe (Readers.Readers m h))
, forall (m :: * -> *) h. CursorEnv m h -> Vector (Ref (Run m h))
cursorRuns :: !(V.Vector (Ref (Run m h)))
, forall (m :: * -> *) h. CursorEnv m h -> Ref (WriteBufferBlobs m h)
cursorWBB :: !(Ref (WBB.WriteBufferBlobs m h))
}
{-# SPECIALISE withCursor ::
OffsetKey
-> Table IO h
-> (Cursor IO h -> IO a)
-> IO a #-}
withCursor ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> OffsetKey
-> Table m h
-> (Cursor m h -> m a)
-> m a
withCursor :: forall (m :: * -> *) h a.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
OffsetKey -> Table m h -> (Cursor m h -> m a) -> m a
withCursor OffsetKey
offsetKey Table m h
t = m (Cursor m h)
-> (Cursor m h -> m ()) -> (Cursor m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (OffsetKey -> Table m h -> m (Cursor m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
OffsetKey -> Table m h -> m (Cursor m h)
newCursor OffsetKey
offsetKey Table m h
t) Cursor m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) =>
Cursor m h -> m ()
closeCursor
{-# SPECIALISE newCursor ::
OffsetKey
-> Table IO h
-> IO (Cursor IO h) #-}
newCursor ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> OffsetKey
-> Table m h
-> m (Cursor m h)
newCursor :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
OffsetKey -> Table m h -> m (Cursor m h)
newCursor !OffsetKey
offsetKey Table m h
t = Table m h -> (TableEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m (Cursor m h)) -> m (Cursor m h))
-> (TableEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
let cursorSession :: Session m h
cursorSession = Table m h -> Session m h
forall (m :: * -> *) h. Table m h -> Session m h
tableSession Table m h
t
let cursorSessionEnv :: SessionEnv m h
cursorSessionEnv = TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv TableEnv m h
tEnv
CursorId
cursorId <- Unique -> CursorId
uniqueToCursorId (Unique -> CursorId) -> m Unique -> m CursorId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (SessionEnv m h -> UniqCounter m
forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter SessionEnv m h
cursorSessionEnv)
let cursorTracer :: Tracer m CursorTrace
cursorTracer = CursorId -> CursorTrace -> LSMTreeTrace
TraceCursor CursorId
cursorId (CursorTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m CursorTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
cursorSession
Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ TableId -> CursorTrace
TraceCreateCursor (Table m h -> TableId
forall (m :: * -> *) h. Table m h -> TableId
tableId Table m h
t)
Session m h -> (SessionEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
cursorSession ((SessionEnv m h -> m (Cursor m h)) -> m (Cursor m h))
-> (SessionEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
_ -> do
(ActionRegistry m -> m (Cursor m h)) -> m (Cursor m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Cursor m h)) -> m (Cursor m h))
-> (ActionRegistry m -> m (Cursor m h)) -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
(WriteBuffer
wb, Ref (WriteBufferBlobs m h)
wbblobs, Vector (Ref (Run m h))
cursorRuns) <- ActionRegistry m
-> RWVar m (TableContent m h)
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)))
forall {m :: * -> *} {h}.
(MonadMask m, PrimMonad m, MonadSTM m) =>
ActionRegistry m
-> RWVar m (TableContent m h)
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)))
dupTableContent ActionRegistry m
reg (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv)
Maybe (Readers m h)
cursorReaders <-
ActionRegistry m
-> m (Maybe (Readers m h))
-> (Readers m h -> m ())
-> m (Maybe (Readers m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m (Maybe a) -> (a -> m ()) -> m (Maybe a)
withRollbackMaybe ActionRegistry m
reg
(OffsetKey
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
-> Vector (Ref (Run m h))
-> m (Maybe (Readers m h))
forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
OffsetKey
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
-> Vector (Ref (Run m h))
-> m (Maybe (Readers m h))
Readers.new OffsetKey
offsetKey ((WriteBuffer, Ref (WriteBufferBlobs m h))
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
forall a. a -> Maybe a
Just (WriteBuffer
wb, Ref (WriteBufferBlobs m h)
wbblobs)) Vector (Ref (Run m h))
cursorRuns)
Readers m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
Readers.close
let cursorWBB :: Ref (WriteBufferBlobs m h)
cursorWBB = Ref (WriteBufferBlobs m h)
wbblobs
StrictMVar m (CursorState m h)
cursorState <- CursorState m h -> m (StrictMVar m (CursorState m h))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar (CursorEnv m h -> CursorState m h
forall (m :: * -> *) h. CursorEnv m h -> CursorState m h
CursorOpen CursorEnv {Maybe (Readers m h)
Ref (WriteBufferBlobs m h)
Vector (Ref (Run m h))
CursorId
SessionEnv m h
Session m h
cursorSessionEnv :: SessionEnv m h
cursorSession :: Session m h
cursorId :: CursorId
cursorReaders :: Maybe (Readers m h)
cursorRuns :: Vector (Ref (Run m h))
cursorWBB :: Ref (WriteBufferBlobs m h)
cursorSession :: Session m h
cursorSessionEnv :: SessionEnv m h
cursorId :: CursorId
cursorRuns :: Vector (Ref (Run m h))
cursorReaders :: Maybe (Readers m h)
cursorWBB :: Ref (WriteBufferBlobs m h)
..})
let !cursor :: Cursor m h
cursor = Cursor {StrictMVar m (CursorState m h)
cursorState :: StrictMVar m (CursorState m h)
cursorState :: StrictMVar m (CursorState m h)
cursorState, Tracer m CursorTrace
cursorTracer :: Tracer m CursorTrace
cursorTracer :: Tracer m CursorTrace
cursorTracer}
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
StrictMVar m (Map CursorId (Cursor m h))
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
cursorSessionEnv) ((Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ())
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall a b. (a -> b) -> a -> b
$
Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> (Map CursorId (Cursor m h) -> Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h)
-> m (Map CursorId (Cursor m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CursorId
-> Cursor m h
-> Map CursorId (Cursor m h)
-> Map CursorId (Cursor m h)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert CursorId
cursorId Cursor m h
cursor
Cursor m h -> m (Cursor m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cursor m h -> m (Cursor m h)) -> Cursor m h -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$! Cursor m h
cursor
where
dupTableContent :: ActionRegistry m
-> RWVar m (TableContent m h)
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)))
dupTableContent ActionRegistry m
reg RWVar m (TableContent m h)
contentVar = do
RWVar m (TableContent m h)
-> (TableContent m h
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h))))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess RWVar m (TableContent m h)
contentVar ((TableContent m h
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h))))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h))))
-> (TableContent m h
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h))))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \TableContent m h
content -> do
let !wb :: WriteBuffer
wb = TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
content
!wbblobs :: Ref (WriteBufferBlobs m h)
wbblobs = TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
content
Ref (WriteBufferBlobs m h)
wbblobs' <- ActionRegistry m
-> m (Ref (WriteBufferBlobs m h))
-> (Ref (WriteBufferBlobs m h) -> m ())
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (WriteBufferBlobs m h)
wbblobs) Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
let runs :: Vector (Ref (Run m h))
runs = LevelsCache m h -> Vector (Ref (Run m h))
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns (TableContent m h -> LevelsCache m h
forall (m :: * -> *) h. TableContent m h -> LevelsCache m h
tableCache TableContent m h
content)
Vector (Ref (Run m h))
runs' <- Vector (Ref (Run m h))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
V.forM Vector (Ref (Run m h))
runs ((Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h))))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->
ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
(WriteBuffer, Ref (WriteBufferBlobs m h), Vector (Ref (Run m h)))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
Vector (Ref (Run m h)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WriteBuffer
wb, Ref (WriteBufferBlobs m h)
wbblobs', Vector (Ref (Run m h))
runs')
{-# SPECIALISE closeCursor :: Cursor IO h -> IO () #-}
closeCursor ::
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m)
=> Cursor m h
-> m ()
closeCursor :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) =>
Cursor m h -> m ()
closeCursor Cursor {Tracer m CursorTrace
StrictMVar m (CursorState m h)
cursorState :: forall (m :: * -> *) h.
Cursor m h -> StrictMVar m (CursorState m h)
cursorTracer :: forall (m :: * -> *) h. Cursor m h -> Tracer m CursorTrace
cursorState :: StrictMVar m (CursorState m h)
cursorTracer :: Tracer m CursorTrace
..} = do
Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ CursorTrace
TraceCloseCursor
m (CursorState m h)
-> (CursorState m h -> m ())
-> (ActionRegistry m -> CursorState m h -> m (CursorState m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_ (StrictMVar m (CursorState m h) -> m (CursorState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
takeMVar StrictMVar m (CursorState m h)
cursorState) (StrictMVar m (CursorState m h) -> CursorState m h -> m ()
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m ()
putMVar StrictMVar m (CursorState m h)
cursorState) ((ActionRegistry m -> CursorState m h -> m (CursorState m h))
-> m ())
-> (ActionRegistry m -> CursorState m h -> m (CursorState m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> \case
CursorState m h
CursorClosed -> CursorState m h -> m (CursorState m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return CursorState m h
forall (m :: * -> *) h. CursorState m h
CursorClosed
CursorOpen CursorEnv {Maybe (Readers m h)
Ref (WriteBufferBlobs m h)
Vector (Ref (Run m h))
CursorId
SessionEnv m h
Session m h
cursorSessionEnv :: forall (m :: * -> *) h. CursorEnv m h -> SessionEnv m h
cursorSession :: forall (m :: * -> *) h. CursorEnv m h -> Session m h
cursorId :: forall (m :: * -> *) h. CursorEnv m h -> CursorId
cursorReaders :: forall (m :: * -> *) h. CursorEnv m h -> Maybe (Readers m h)
cursorRuns :: forall (m :: * -> *) h. CursorEnv m h -> Vector (Ref (Run m h))
cursorWBB :: forall (m :: * -> *) h. CursorEnv m h -> Ref (WriteBufferBlobs m h)
cursorSession :: Session m h
cursorSessionEnv :: SessionEnv m h
cursorId :: CursorId
cursorReaders :: Maybe (Readers m h)
cursorRuns :: Vector (Ref (Run m h))
cursorWBB :: Ref (WriteBufferBlobs m h)
..} -> do
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
StrictMVar m (Map CursorId (Cursor m h))
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
cursorSessionEnv) ((Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ())
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall a b. (a -> b) -> a -> b
$
Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> (Map CursorId (Cursor m h) -> Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h)
-> m (Map CursorId (Cursor m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CursorId -> Map CursorId (Cursor m h) -> Map CursorId (Cursor m h)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete CursorId
cursorId
Maybe (Readers m h) -> (Readers m h -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (Readers m h)
cursorReaders ((Readers m h -> m ()) -> m ()) -> (Readers m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Readers m h -> m ()) -> Readers m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Readers m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
Readers.close
Vector (Ref (Run m h)) -> (Ref (Run m h) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (Ref (Run m h))
cursorRuns ((Ref (Run m h) -> m ()) -> m ())
-> (Ref (Run m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (WriteBufferBlobs m h)
cursorWBB)
CursorState m h -> m (CursorState m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return CursorState m h
forall (m :: * -> *) h. CursorState m h
CursorClosed
{-# SPECIALISE readCursor ::
ResolveSerialisedValue
-> Int
-> Cursor IO h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
-> IO (V.Vector res) #-}
readCursor ::
forall m h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> Int
-> Cursor m h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (V.Vector res)
readCursor :: forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursor ResolveSerialisedValue
resolve Int
n Cursor m h
cursor SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry =
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursorWhile ResolveSerialisedValue
resolve (Bool -> SerialisedKey -> Bool
forall a b. a -> b -> a
const Bool
True) Int
n Cursor m h
cursor SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry
data CursorClosedError
= ErrCursorClosed
deriving stock (Int -> CursorClosedError -> ShowS
[CursorClosedError] -> ShowS
CursorClosedError -> String
(Int -> CursorClosedError -> ShowS)
-> (CursorClosedError -> String)
-> ([CursorClosedError] -> ShowS)
-> Show CursorClosedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CursorClosedError -> ShowS
showsPrec :: Int -> CursorClosedError -> ShowS
$cshow :: CursorClosedError -> String
show :: CursorClosedError -> String
$cshowList :: [CursorClosedError] -> ShowS
showList :: [CursorClosedError] -> ShowS
Show, CursorClosedError -> CursorClosedError -> Bool
(CursorClosedError -> CursorClosedError -> Bool)
-> (CursorClosedError -> CursorClosedError -> Bool)
-> Eq CursorClosedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CursorClosedError -> CursorClosedError -> Bool
== :: CursorClosedError -> CursorClosedError -> Bool
$c/= :: CursorClosedError -> CursorClosedError -> Bool
/= :: CursorClosedError -> CursorClosedError -> Bool
Eq)
deriving anyclass (Show CursorClosedError
Typeable CursorClosedError
(Typeable CursorClosedError, Show CursorClosedError) =>
(CursorClosedError -> SomeException)
-> (SomeException -> Maybe CursorClosedError)
-> (CursorClosedError -> String)
-> Exception CursorClosedError
SomeException -> Maybe CursorClosedError
CursorClosedError -> String
CursorClosedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: CursorClosedError -> SomeException
toException :: CursorClosedError -> SomeException
$cfromException :: SomeException -> Maybe CursorClosedError
fromException :: SomeException -> Maybe CursorClosedError
$cdisplayException :: CursorClosedError -> String
displayException :: CursorClosedError -> String
Exception)
{-# SPECIALISE readCursorWhile ::
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor IO h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
-> IO (V.Vector res) #-}
readCursorWhile ::
forall m h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (V.Vector res)
readCursorWhile :: forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursorWhile ResolveSerialisedValue
resolve SerialisedKey -> Bool
keyIsWanted Int
n Cursor {Tracer m CursorTrace
StrictMVar m (CursorState m h)
cursorState :: forall (m :: * -> *) h.
Cursor m h -> StrictMVar m (CursorState m h)
cursorTracer :: forall (m :: * -> *) h. Cursor m h -> Tracer m CursorTrace
cursorState :: StrictMVar m (CursorState m h)
cursorTracer :: Tracer m CursorTrace
..} SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry = do
Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> CursorTrace
TraceReadCursor Int
n
StrictMVar m (CursorState m h)
-> (CursorState m h -> m (CursorState m h, Vector res))
-> m (Vector res)
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m (a, b)) -> m b
modifyMVar StrictMVar m (CursorState m h)
cursorState ((CursorState m h -> m (CursorState m h, Vector res))
-> m (Vector res))
-> (CursorState m h -> m (CursorState m h, Vector res))
-> m (Vector res)
forall a b. (a -> b) -> a -> b
$ \case
CursorState m h
CursorClosed -> CursorClosedError -> m (CursorState m h, Vector res)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO CursorClosedError
ErrCursorClosed
state :: CursorState m h
state@(CursorOpen CursorEnv m h
cursorEnv) -> do
case CursorEnv m h -> Maybe (Readers m h)
forall (m :: * -> *) h. CursorEnv m h -> Maybe (Readers m h)
cursorReaders CursorEnv m h
cursorEnv of
Maybe (Readers m h)
Nothing ->
(CursorState m h, Vector res) -> m (CursorState m h, Vector res)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (CursorState m h
state, Vector res
forall a. Vector a
V.empty)
Just Readers m h
readers -> do
(Vector res
vec, HasMore
hasMore) <- ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> Readers m h
-> Int
-> m (Vector res, HasMore)
forall h (m :: * -> *) res.
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey
-> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> Readers m h
-> Int
-> m (Vector res, HasMore)
Cursor.readEntriesWhile ResolveSerialisedValue
resolve SerialisedKey -> Bool
keyIsWanted SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry Readers m h
readers Int
n
let !state' :: CursorState m h
state' = case HasMore
hasMore of
HasMore
Readers.HasMore -> CursorState m h
state
HasMore
Readers.Drained -> CursorEnv m h -> CursorState m h
forall (m :: * -> *) h. CursorEnv m h -> CursorState m h
CursorOpen (CursorEnv m h
cursorEnv {cursorReaders = Nothing})
(CursorState m h, Vector res) -> m (CursorState m h, Vector res)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (CursorState m h
state', Vector res
vec)
data SnapshotExistsError
= ErrSnapshotExists !SnapshotName
deriving stock (Int -> SnapshotExistsError -> ShowS
[SnapshotExistsError] -> ShowS
SnapshotExistsError -> String
(Int -> SnapshotExistsError -> ShowS)
-> (SnapshotExistsError -> String)
-> ([SnapshotExistsError] -> ShowS)
-> Show SnapshotExistsError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotExistsError -> ShowS
showsPrec :: Int -> SnapshotExistsError -> ShowS
$cshow :: SnapshotExistsError -> String
show :: SnapshotExistsError -> String
$cshowList :: [SnapshotExistsError] -> ShowS
showList :: [SnapshotExistsError] -> ShowS
Show, SnapshotExistsError -> SnapshotExistsError -> Bool
(SnapshotExistsError -> SnapshotExistsError -> Bool)
-> (SnapshotExistsError -> SnapshotExistsError -> Bool)
-> Eq SnapshotExistsError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotExistsError -> SnapshotExistsError -> Bool
== :: SnapshotExistsError -> SnapshotExistsError -> Bool
$c/= :: SnapshotExistsError -> SnapshotExistsError -> Bool
/= :: SnapshotExistsError -> SnapshotExistsError -> Bool
Eq)
deriving anyclass (Show SnapshotExistsError
Typeable SnapshotExistsError
(Typeable SnapshotExistsError, Show SnapshotExistsError) =>
(SnapshotExistsError -> SomeException)
-> (SomeException -> Maybe SnapshotExistsError)
-> (SnapshotExistsError -> String)
-> Exception SnapshotExistsError
SomeException -> Maybe SnapshotExistsError
SnapshotExistsError -> String
SnapshotExistsError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotExistsError -> SomeException
toException :: SnapshotExistsError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotExistsError
fromException :: SomeException -> Maybe SnapshotExistsError
$cdisplayException :: SnapshotExistsError -> String
displayException :: SnapshotExistsError -> String
Exception)
{-# SPECIALISE createSnapshot ::
SnapshotName
-> SnapshotLabel
-> SnapshotTableType
-> Table IO h
-> IO () #-}
createSnapshot ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> SnapshotName
-> SnapshotLabel
-> SnapshotTableType
-> Table m h
-> m ()
createSnapshot :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
SnapshotName
-> SnapshotLabel -> SnapshotTableType -> Table m h -> m ()
createSnapshot SnapshotName
snap SnapshotLabel
label SnapshotTableType
tableType Table m h
t = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> TableTrace
TraceSnapshot SnapshotName
snap
Table m h -> (TableEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m ()) -> m ()) -> (TableEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv ->
(ActionRegistry m -> m ()) -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m ()) -> m ())
-> (ActionRegistry m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
let hfs :: HasFS m h
hfs = TableEnv m h -> HasFS m h
forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS TableEnv m h
tEnv
hbio :: HasBlockIO m h
hbio = TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv
activeUc :: UniqCounter m
activeUc = TableEnv m h -> UniqCounter m
forall (m :: * -> *) h. TableEnv m h -> UniqCounter m
tableSessionUniqCounter TableEnv m h
tEnv
let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv) SnapshotName
snap
Bool
snapshotExists <- SnapshotName -> SessionEnv m h -> m Bool
forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap (TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv TableEnv m h
tEnv)
if Bool
snapshotExists then
SnapshotExistsError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName -> SnapshotExistsError
ErrSnapshotExists SnapshotName
snap)
else
ActionRegistry m -> m () -> m () -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> m () -> m a
withRollback_ ActionRegistry m
reg
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.createDirectory HasFS m h
hfs (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir))
(HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir))
TableContent m h
content <- RWVar m (TableContent m h)
-> (TableContent m h -> m (TableContent m h))
-> m (TableContent m h)
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) (ActionRegistry m -> TableContent m h -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m (TableContent m h)
duplicateTableContent ActionRegistry m
reg)
UniqCounter m
snapUc <- Int -> m (UniqCounter m)
forall (m :: * -> *). PrimMonad m => Int -> m (UniqCounter m)
newUniqCounter Int
0
let activeDir :: ActiveDir
activeDir = SessionRoot -> ActiveDir
Paths.activeDir (TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv)
let wb :: WriteBuffer
wb = TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
content
let wbb :: Ref (WriteBufferBlobs m h)
wbb = TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
content
RunNumber
snapWriteBufferNumber <- WriteBufferFsPaths -> RunNumber
Paths.writeBufferNumber (WriteBufferFsPaths -> RunNumber)
-> m WriteBufferFsPaths -> m RunNumber
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> UniqCounter m
-> ActionRegistry m
-> ActiveDir
-> NamedSnapshotDir
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m WriteBufferFsPaths
forall (m :: * -> *) h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> UniqCounter m
-> ActionRegistry m
-> ActiveDir
-> NamedSnapshotDir
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m WriteBufferFsPaths
snapshotWriteBuffer HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
activeUc UniqCounter m
snapUc ActionRegistry m
reg ActiveDir
activeDir NamedSnapshotDir
snapDir WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbb
SnapLevels (Ref (Run m h))
snapLevels <- Levels m h -> m (SnapLevels (Ref (Run m h)))
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m) =>
Levels m h -> m (SnapLevels (Ref (Run m h)))
toSnapLevels (TableContent m h -> Levels m h
forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels TableContent m h
content)
SnapLevels SnapshotRun
snapLevels' <- (Ref (Run m h) -> m SnapshotRun)
-> SnapLevels (Ref (Run m h)) -> m (SnapLevels SnapshotRun)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapLevels a -> f (SnapLevels b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
snapshotRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
snapUc ActionRegistry m
reg NamedSnapshotDir
snapDir) SnapLevels (Ref (Run m h))
snapLevels
Maybe (SnapMergingTree SnapshotRun)
mTreeOpt <- case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
content of
UnionLevel m h
NoUnion -> Maybe (SnapMergingTree SnapshotRun)
-> m (Maybe (SnapMergingTree SnapshotRun))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (SnapMergingTree SnapshotRun)
forall a. Maybe a
Nothing
Union Ref (MergingTree m h)
mTreeRef -> do
SnapMergingTree (Ref (Run m h))
mTree <- Ref (MergingTree m h) -> m (SnapMergingTree (Ref (Run m h)))
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m) =>
Ref (MergingTree m h) -> m (SnapMergingTree (Ref (Run m h)))
toSnapMergingTree Ref (MergingTree m h)
mTreeRef
SnapMergingTree SnapshotRun -> Maybe (SnapMergingTree SnapshotRun)
forall a. a -> Maybe a
Just (SnapMergingTree SnapshotRun
-> Maybe (SnapMergingTree SnapshotRun))
-> m (SnapMergingTree SnapshotRun)
-> m (Maybe (SnapMergingTree SnapshotRun))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ref (Run m h) -> m SnapshotRun)
-> SnapMergingTree (Ref (Run m h))
-> m (SnapMergingTree SnapshotRun)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapMergingTree a -> f (SnapMergingTree b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
snapshotRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
snapUc ActionRegistry m
reg NamedSnapshotDir
snapDir) SnapMergingTree (Ref (Run m h))
mTree
ActionRegistry m -> TableContent m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m ()
releaseTableContent ActionRegistry m
reg TableContent m h
content
let snapMetaData :: SnapshotMetaData
snapMetaData = SnapshotLabel
-> SnapshotTableType
-> TableConfig
-> RunNumber
-> SnapLevels SnapshotRun
-> Maybe (SnapMergingTree SnapshotRun)
-> SnapshotMetaData
SnapshotMetaData
SnapshotLabel
label
SnapshotTableType
tableType
(Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig Table m h
t)
RunNumber
snapWriteBufferNumber
SnapLevels SnapshotRun
snapLevels'
Maybe (SnapMergingTree SnapshotRun)
mTreeOpt
SnapshotMetaDataFile FsPath
contentPath = NamedSnapshotDir -> SnapshotMetaDataFile
Paths.snapshotMetaDataFile NamedSnapshotDir
snapDir
SnapshotMetaDataChecksumFile FsPath
checksumPath = NamedSnapshotDir -> SnapshotMetaDataChecksumFile
Paths.snapshotMetaDataChecksumFile NamedSnapshotDir
snapDir
HasFS m h -> FsPath -> FsPath -> SnapshotMetaData -> m ()
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> FsPath -> FsPath -> SnapshotMetaData -> m ()
writeFileSnapshotMetaData HasFS m h
hfs FsPath
contentPath FsPath
checksumPath SnapshotMetaData
snapMetaData
HasFS m h -> HasBlockIO m h -> FsPath -> m ()
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> HasBlockIO m h -> FsPath -> m ()
FS.synchroniseDirectoryRecursive HasFS m h
hfs HasBlockIO m h
hbio (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir)
data SnapshotDoesNotExistError
= ErrSnapshotDoesNotExist !SnapshotName
deriving stock (Int -> SnapshotDoesNotExistError -> ShowS
[SnapshotDoesNotExistError] -> ShowS
SnapshotDoesNotExistError -> String
(Int -> SnapshotDoesNotExistError -> ShowS)
-> (SnapshotDoesNotExistError -> String)
-> ([SnapshotDoesNotExistError] -> ShowS)
-> Show SnapshotDoesNotExistError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotDoesNotExistError -> ShowS
showsPrec :: Int -> SnapshotDoesNotExistError -> ShowS
$cshow :: SnapshotDoesNotExistError -> String
show :: SnapshotDoesNotExistError -> String
$cshowList :: [SnapshotDoesNotExistError] -> ShowS
showList :: [SnapshotDoesNotExistError] -> ShowS
Show, SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
(SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool)
-> (SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool)
-> Eq SnapshotDoesNotExistError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
== :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
$c/= :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
/= :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
Eq)
deriving anyclass (Show SnapshotDoesNotExistError
Typeable SnapshotDoesNotExistError
(Typeable SnapshotDoesNotExistError,
Show SnapshotDoesNotExistError) =>
(SnapshotDoesNotExistError -> SomeException)
-> (SomeException -> Maybe SnapshotDoesNotExistError)
-> (SnapshotDoesNotExistError -> String)
-> Exception SnapshotDoesNotExistError
SomeException -> Maybe SnapshotDoesNotExistError
SnapshotDoesNotExistError -> String
SnapshotDoesNotExistError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotDoesNotExistError -> SomeException
toException :: SnapshotDoesNotExistError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotDoesNotExistError
fromException :: SomeException -> Maybe SnapshotDoesNotExistError
$cdisplayException :: SnapshotDoesNotExistError -> String
displayException :: SnapshotDoesNotExistError -> String
Exception)
data SnapshotCorruptedError
= ErrSnapshotCorrupted
!SnapshotName
!FileCorruptedError
deriving stock (Int -> SnapshotCorruptedError -> ShowS
[SnapshotCorruptedError] -> ShowS
SnapshotCorruptedError -> String
(Int -> SnapshotCorruptedError -> ShowS)
-> (SnapshotCorruptedError -> String)
-> ([SnapshotCorruptedError] -> ShowS)
-> Show SnapshotCorruptedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotCorruptedError -> ShowS
showsPrec :: Int -> SnapshotCorruptedError -> ShowS
$cshow :: SnapshotCorruptedError -> String
show :: SnapshotCorruptedError -> String
$cshowList :: [SnapshotCorruptedError] -> ShowS
showList :: [SnapshotCorruptedError] -> ShowS
Show, SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
(SnapshotCorruptedError -> SnapshotCorruptedError -> Bool)
-> (SnapshotCorruptedError -> SnapshotCorruptedError -> Bool)
-> Eq SnapshotCorruptedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
== :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
$c/= :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
/= :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
Eq)
deriving anyclass (Show SnapshotCorruptedError
Typeable SnapshotCorruptedError
(Typeable SnapshotCorruptedError, Show SnapshotCorruptedError) =>
(SnapshotCorruptedError -> SomeException)
-> (SomeException -> Maybe SnapshotCorruptedError)
-> (SnapshotCorruptedError -> String)
-> Exception SnapshotCorruptedError
SomeException -> Maybe SnapshotCorruptedError
SnapshotCorruptedError -> String
SnapshotCorruptedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotCorruptedError -> SomeException
toException :: SnapshotCorruptedError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotCorruptedError
fromException :: SomeException -> Maybe SnapshotCorruptedError
$cdisplayException :: SnapshotCorruptedError -> String
displayException :: SnapshotCorruptedError -> String
Exception)
data SnapshotNotCompatibleError
=
ErrSnapshotWrongTableType
!SnapshotName
!SnapshotTableType
!SnapshotTableType
|
ErrSnapshotWrongLabel
!SnapshotName
!SnapshotLabel
!SnapshotLabel
deriving stock (Int -> SnapshotNotCompatibleError -> ShowS
[SnapshotNotCompatibleError] -> ShowS
SnapshotNotCompatibleError -> String
(Int -> SnapshotNotCompatibleError -> ShowS)
-> (SnapshotNotCompatibleError -> String)
-> ([SnapshotNotCompatibleError] -> ShowS)
-> Show SnapshotNotCompatibleError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotNotCompatibleError -> ShowS
showsPrec :: Int -> SnapshotNotCompatibleError -> ShowS
$cshow :: SnapshotNotCompatibleError -> String
show :: SnapshotNotCompatibleError -> String
$cshowList :: [SnapshotNotCompatibleError] -> ShowS
showList :: [SnapshotNotCompatibleError] -> ShowS
Show, SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
(SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool)
-> (SnapshotNotCompatibleError
-> SnapshotNotCompatibleError -> Bool)
-> Eq SnapshotNotCompatibleError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
== :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
$c/= :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
/= :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
Eq)
deriving anyclass (Show SnapshotNotCompatibleError
Typeable SnapshotNotCompatibleError
(Typeable SnapshotNotCompatibleError,
Show SnapshotNotCompatibleError) =>
(SnapshotNotCompatibleError -> SomeException)
-> (SomeException -> Maybe SnapshotNotCompatibleError)
-> (SnapshotNotCompatibleError -> String)
-> Exception SnapshotNotCompatibleError
SomeException -> Maybe SnapshotNotCompatibleError
SnapshotNotCompatibleError -> String
SnapshotNotCompatibleError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotNotCompatibleError -> SomeException
toException :: SnapshotNotCompatibleError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotNotCompatibleError
fromException :: SomeException -> Maybe SnapshotNotCompatibleError
$cdisplayException :: SnapshotNotCompatibleError -> String
displayException :: SnapshotNotCompatibleError -> String
Exception)
{-# SPECIALISE openSnapshot ::
Session IO h
-> SnapshotLabel
-> SnapshotTableType
-> TableConfigOverride
-> SnapshotName
-> ResolveSerialisedValue
-> IO (Table IO h) #-}
openSnapshot ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> Session m h
-> SnapshotLabel
-> SnapshotTableType
-> TableConfigOverride
-> SnapshotName
-> ResolveSerialisedValue
-> m (Table m h)
openSnapshot :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Session m h
-> SnapshotLabel
-> SnapshotTableType
-> TableConfigOverride
-> SnapshotName
-> ResolveSerialisedValue
-> m (Table m h)
openSnapshot Session m h
sesh SnapshotLabel
label SnapshotTableType
tableType TableConfigOverride
override SnapshotName
snap ResolveSerialisedValue
resolve =
SnapshotName -> m (Table m h) -> m (Table m h)
forall (m :: * -> *) a. MonadCatch m => SnapshotName -> m a -> m a
wrapFileCorruptedErrorAsSnapshotCorruptedError SnapshotName
snap (m (Table m h) -> m (Table m h)) -> m (Table m h) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ do
Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh) (LSMTreeTrace -> m ()) -> LSMTreeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> TableConfigOverride -> LSMTreeTrace
TraceOpenSnapshot SnapshotName
snap TableConfigOverride
override
Session m h -> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m (Table m h)) -> m (Table m h))
-> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
(ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Table m h)) -> m (Table m h))
-> (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
let hfs :: HasFS m h
hfs = SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv
hbio :: HasBlockIO m h
hbio = SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO SessionEnv m h
seshEnv
uc :: UniqCounter m
uc = SessionEnv m h -> UniqCounter m
forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter SessionEnv m h
seshEnv
let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) SnapshotName
snap
HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir) m Bool -> (Bool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
b ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName -> SnapshotDoesNotExistError
ErrSnapshotDoesNotExist SnapshotName
snap)
let SnapshotMetaDataFile FsPath
contentPath = NamedSnapshotDir -> SnapshotMetaDataFile
Paths.snapshotMetaDataFile NamedSnapshotDir
snapDir
SnapshotMetaDataChecksumFile FsPath
checksumPath = NamedSnapshotDir -> SnapshotMetaDataChecksumFile
Paths.snapshotMetaDataChecksumFile NamedSnapshotDir
snapDir
SnapshotMetaData
snapMetaData <- HasFS m h -> FsPath -> FsPath -> m SnapshotMetaData
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> FsPath -> FsPath -> m SnapshotMetaData
readFileSnapshotMetaData HasFS m h
hfs FsPath
contentPath FsPath
checksumPath
let SnapshotMetaData SnapshotLabel
label' SnapshotTableType
tableType' TableConfig
conf RunNumber
snapWriteBuffer SnapLevels SnapshotRun
snapLevels Maybe (SnapMergingTree SnapshotRun)
mTreeOpt = SnapshotMetaData
snapMetaData
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (SnapshotTableType
tableType SnapshotTableType -> SnapshotTableType -> Bool
forall a. Eq a => a -> a -> Bool
== SnapshotTableType
tableType') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
SnapshotNotCompatibleError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName
-> SnapshotTableType
-> SnapshotTableType
-> SnapshotNotCompatibleError
ErrSnapshotWrongTableType SnapshotName
snap SnapshotTableType
tableType SnapshotTableType
tableType')
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (SnapshotLabel
label SnapshotLabel -> SnapshotLabel -> Bool
forall a. Eq a => a -> a -> Bool
== SnapshotLabel
label') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
SnapshotNotCompatibleError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName
-> SnapshotLabel -> SnapshotLabel -> SnapshotNotCompatibleError
ErrSnapshotWrongLabel SnapshotName
snap SnapshotLabel
label SnapshotLabel
label')
let conf' :: TableConfig
conf' = TableConfigOverride -> TableConfig -> TableConfig
applyOverride TableConfigOverride
override TableConfig
conf
ArenaManager (PrimState m)
am <- m (ArenaManager (PrimState m))
forall (m :: * -> *). PrimMonad m => m (ArenaManager (PrimState m))
newArenaManager
let activeDir :: ActiveDir
activeDir = SessionRoot -> ActiveDir
Paths.activeDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv)
let snapWriteBufferPaths :: WriteBufferFsPaths
snapWriteBufferPaths = FsPath -> RunNumber -> WriteBufferFsPaths
Paths.WriteBufferFsPaths (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir) RunNumber
snapWriteBuffer
(WriteBuffer
tableWriteBuffer, Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs) <-
ActionRegistry m
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActiveDir
-> WriteBufferFsPaths
-> m (WriteBuffer, Ref (WriteBufferBlobs m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
ActionRegistry m
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActiveDir
-> WriteBufferFsPaths
-> m (WriteBuffer, Ref (WriteBufferBlobs m h))
openWriteBuffer ActionRegistry m
reg ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ActiveDir
activeDir WriteBufferFsPaths
snapWriteBufferPaths
SnapLevels (Ref (Run m h))
snapLevels' <- (SnapshotRun -> m (Ref (Run m h)))
-> SnapLevels SnapshotRun -> m (SnapLevels (Ref (Run m h)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapLevels a -> f (SnapLevels b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> SnapshotRun
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> SnapshotRun
-> m (Ref (Run m h))
openRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ActionRegistry m
reg NamedSnapshotDir
snapDir ActiveDir
activeDir) SnapLevels SnapshotRun
snapLevels
UnionLevel m h
unionLevel <- case Maybe (SnapMergingTree SnapshotRun)
mTreeOpt of
Maybe (SnapMergingTree SnapshotRun)
Nothing -> UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure UnionLevel m h
forall (m :: * -> *) h. UnionLevel m h
NoUnion
Just SnapMergingTree SnapshotRun
mTree -> do
SnapMergingTree (Ref (Run m h))
snapTree <- (SnapshotRun -> m (Ref (Run m h)))
-> SnapMergingTree SnapshotRun
-> m (SnapMergingTree (Ref (Run m h)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapMergingTree a -> f (SnapMergingTree b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> SnapshotRun
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> SnapshotRun
-> m (Ref (Run m h))
openRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ActionRegistry m
reg NamedSnapshotDir
snapDir ActiveDir
activeDir) SnapMergingTree SnapshotRun
mTree
Ref (MergingTree m h)
mt <- HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ResolveSerialisedValue
-> ActiveDir
-> ActionRegistry m
-> SnapMergingTree (Ref (Run m h))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ResolveSerialisedValue
-> ActiveDir
-> ActionRegistry m
-> SnapMergingTree (Ref (Run m h))
-> m (Ref (MergingTree m h))
fromSnapMergingTree HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ResolveSerialisedValue
resolve ActiveDir
activeDir ActionRegistry m
reg SnapMergingTree (Ref (Run m h))
snapTree
(Ref (Run m h) -> m ()) -> SnapMergingTree (Ref (Run m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) SnapMergingTree (Ref (Run m h))
snapTree
UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Ref (MergingTree m h) -> UnionLevel m h
forall (m :: * -> *) h. Ref (MergingTree m h) -> UnionLevel m h
Union Ref (MergingTree m h)
mt)
Levels m h
tableLevels <- HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> ActionRegistry m
-> ActiveDir
-> SnapLevels (Ref (Run m h))
-> m (Levels m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> ActionRegistry m
-> ActiveDir
-> SnapLevels (Ref (Run m h))
-> m (Levels m h)
fromSnapLevels HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc TableConfig
conf ResolveSerialisedValue
resolve ActionRegistry m
reg ActiveDir
activeDir SnapLevels (Ref (Run m h))
snapLevels'
(Ref (Run m h) -> m ()) -> SnapLevels (Ref (Run m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) SnapLevels (Ref (Run m h))
snapLevels'
LevelsCache m h
tableCache <- ActionRegistry m -> Levels m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
mkLevelsCache ActionRegistry m
reg Levels m h
tableLevels
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf' ArenaManager (PrimState m)
am (TableContent m h -> m (Table m h))
-> TableContent m h -> m (Table m h)
forall a b. (a -> b) -> a -> b
$! TableContent {
WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer
, Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
, Levels m h
tableLevels :: Levels m h
tableLevels :: Levels m h
tableLevels
, LevelsCache m h
tableCache :: LevelsCache m h
tableCache :: LevelsCache m h
tableCache
, tableUnionLevel :: UnionLevel m h
tableUnionLevel = UnionLevel m h
unionLevel
}
{-# SPECIALISE wrapFileCorruptedErrorAsSnapshotCorruptedError ::
SnapshotName
-> IO a
-> IO a
#-}
wrapFileCorruptedErrorAsSnapshotCorruptedError ::
forall m a.
(MonadCatch m)
=> SnapshotName
-> m a
-> m a
wrapFileCorruptedErrorAsSnapshotCorruptedError :: forall (m :: * -> *) a. MonadCatch m => SnapshotName -> m a -> m a
wrapFileCorruptedErrorAsSnapshotCorruptedError SnapshotName
snapshotName m a
action =
m a
action m a -> [Handler m a] -> m a
forall (m :: * -> *) a. MonadCatch m => m a -> [Handler m a] -> m a
`catches` [Handler m a]
handlers
where
handlers :: [Handler m a]
handlers :: [Handler m a]
handlers =
[ (FileCorruptedError -> m a) -> Handler m a
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((FileCorruptedError -> m a) -> Handler m a)
-> (FileCorruptedError -> m a) -> Handler m a
forall a b. (a -> b) -> a -> b
$ SnapshotCorruptedError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotCorruptedError -> m a)
-> (FileCorruptedError -> SnapshotCorruptedError)
-> FileCorruptedError
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FileCorruptedError -> SnapshotCorruptedError
wrapFileCorruptedError
, (AbortActionRegistryError -> m a) -> Handler m a
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((AbortActionRegistryError -> m a) -> Handler m a)
-> (AbortActionRegistryError -> m a) -> Handler m a
forall a b. (a -> b) -> a -> b
$ AbortActionRegistryError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (AbortActionRegistryError -> m a)
-> (AbortActionRegistryError -> AbortActionRegistryError)
-> AbortActionRegistryError
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AbortActionRegistryError -> AbortActionRegistryError
wrapAbortActionRegistryError
, (CommitActionRegistryError -> m a) -> Handler m a
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((CommitActionRegistryError -> m a) -> Handler m a)
-> (CommitActionRegistryError -> m a) -> Handler m a
forall a b. (a -> b) -> a -> b
$ CommitActionRegistryError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (CommitActionRegistryError -> m a)
-> (CommitActionRegistryError -> CommitActionRegistryError)
-> CommitActionRegistryError
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CommitActionRegistryError -> CommitActionRegistryError
wrapCommitActionRegistryError
]
wrapSomeException :: SomeException -> SomeException
wrapSomeException :: SomeException -> SomeException
wrapSomeException SomeException
e =
SomeException -> Maybe SomeException -> SomeException
forall a. a -> Maybe a -> a
fromMaybe SomeException
e (Maybe SomeException -> SomeException)
-> ([Maybe SomeException] -> Maybe SomeException)
-> [Maybe SomeException]
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. First SomeException -> Maybe SomeException
forall a. First a -> Maybe a
getFirst (First SomeException -> Maybe SomeException)
-> ([Maybe SomeException] -> First SomeException)
-> [Maybe SomeException]
-> Maybe SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [First SomeException] -> First SomeException
forall a. Monoid a => [a] -> a
mconcat ([First SomeException] -> First SomeException)
-> ([Maybe SomeException] -> [First SomeException])
-> [Maybe SomeException]
-> First SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe SomeException -> First SomeException)
-> [Maybe SomeException] -> [First SomeException]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe SomeException -> First SomeException
forall a. Maybe a -> First a
First ([Maybe SomeException] -> SomeException)
-> [Maybe SomeException] -> SomeException
forall a b. (a -> b) -> a -> b
$
[ SnapshotCorruptedError -> SomeException
forall e. Exception e => e -> SomeException
toException (SnapshotCorruptedError -> SomeException)
-> (FileCorruptedError -> SnapshotCorruptedError)
-> FileCorruptedError
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FileCorruptedError -> SnapshotCorruptedError
wrapFileCorruptedError (FileCorruptedError -> SomeException)
-> Maybe FileCorruptedError -> Maybe SomeException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SomeException -> Maybe FileCorruptedError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
, AbortActionRegistryError -> SomeException
forall e. Exception e => e -> SomeException
toException (AbortActionRegistryError -> SomeException)
-> (AbortActionRegistryError -> AbortActionRegistryError)
-> AbortActionRegistryError
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AbortActionRegistryError -> AbortActionRegistryError
wrapAbortActionRegistryError (AbortActionRegistryError -> SomeException)
-> Maybe AbortActionRegistryError -> Maybe SomeException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SomeException -> Maybe AbortActionRegistryError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
, CommitActionRegistryError -> SomeException
forall e. Exception e => e -> SomeException
toException (CommitActionRegistryError -> SomeException)
-> (CommitActionRegistryError -> CommitActionRegistryError)
-> CommitActionRegistryError
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CommitActionRegistryError -> CommitActionRegistryError
wrapCommitActionRegistryError (CommitActionRegistryError -> SomeException)
-> Maybe CommitActionRegistryError -> Maybe SomeException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SomeException -> Maybe CommitActionRegistryError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
]
wrapFileCorruptedError :: FileCorruptedError -> SnapshotCorruptedError
wrapFileCorruptedError :: FileCorruptedError -> SnapshotCorruptedError
wrapFileCorruptedError = SnapshotName -> FileCorruptedError -> SnapshotCorruptedError
ErrSnapshotCorrupted SnapshotName
snapshotName
wrapAbortActionRegistryError :: AbortActionRegistryError -> AbortActionRegistryError
wrapAbortActionRegistryError :: AbortActionRegistryError -> AbortActionRegistryError
wrapAbortActionRegistryError = \case
AbortActionRegistryError AbortActionRegistryReason
reason NonEmpty ActionError
es ->
AbortActionRegistryReason
-> NonEmpty ActionError -> AbortActionRegistryError
AbortActionRegistryError (AbortActionRegistryReason -> AbortActionRegistryReason
wrapAbortActionRegistryReason AbortActionRegistryReason
reason) ((SomeException -> SomeException) -> ActionError -> ActionError
mapActionError SomeException -> SomeException
wrapSomeException (ActionError -> ActionError)
-> NonEmpty ActionError -> NonEmpty ActionError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty ActionError
es)
wrapAbortActionRegistryReason :: AbortActionRegistryReason -> AbortActionRegistryReason
wrapAbortActionRegistryReason :: AbortActionRegistryReason -> AbortActionRegistryReason
wrapAbortActionRegistryReason = \case
ReasonExitCaseException SomeException
e -> SomeException -> AbortActionRegistryReason
ReasonExitCaseException (SomeException -> SomeException
wrapSomeException SomeException
e)
AbortActionRegistryReason
ReasonExitCaseAbort -> AbortActionRegistryReason
ReasonExitCaseAbort
wrapCommitActionRegistryError :: CommitActionRegistryError -> CommitActionRegistryError
wrapCommitActionRegistryError :: CommitActionRegistryError -> CommitActionRegistryError
wrapCommitActionRegistryError = \case
CommitActionRegistryError NonEmpty ActionError
es ->
NonEmpty ActionError -> CommitActionRegistryError
CommitActionRegistryError ((SomeException -> SomeException) -> ActionError -> ActionError
mapActionError SomeException -> SomeException
wrapSomeException (ActionError -> ActionError)
-> NonEmpty ActionError -> NonEmpty ActionError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty ActionError
es)
{-# SPECIALISE doesSnapshotExist ::
Session IO h
-> SnapshotName
-> IO Bool #-}
doesSnapshotExist ::
(MonadMask m, MonadSTM m)
=> Session m h
-> SnapshotName
-> m Bool
doesSnapshotExist :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m) =>
Session m h -> SnapshotName -> m Bool
doesSnapshotExist Session m h
sesh SnapshotName
snap = Session m h -> (SessionEnv m h -> m Bool) -> m Bool
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh (SnapshotName -> SessionEnv m h -> m Bool
forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap)
doesSnapshotDirExist :: SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist :: forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap SessionEnv m h
seshEnv = do
let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) SnapshotName
snap
HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir)
{-# SPECIALISE deleteSnapshot ::
Session IO h
-> SnapshotName
-> IO () #-}
deleteSnapshot ::
(MonadMask m, MonadSTM m)
=> Session m h
-> SnapshotName
-> m ()
deleteSnapshot :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m) =>
Session m h -> SnapshotName -> m ()
deleteSnapshot Session m h
sesh SnapshotName
snap = do
Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh) (LSMTreeTrace -> m ()) -> LSMTreeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> LSMTreeTrace
TraceDeleteSnapshot SnapshotName
snap
Session m h -> (SessionEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m ()) -> m ())
-> (SessionEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) SnapshotName
snap
Bool
snapshotExists <- SnapshotName -> SessionEnv m h -> m Bool
forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap SessionEnv m h
seshEnv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
snapshotExists (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName -> SnapshotDoesNotExistError
ErrSnapshotDoesNotExist SnapshotName
snap)
HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir)
{-# SPECIALISE listSnapshots :: Session IO h -> IO [SnapshotName] #-}
listSnapshots ::
(MonadMask m, MonadSTM m)
=> Session m h
-> m [SnapshotName]
listSnapshots :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m) =>
Session m h -> m [SnapshotName]
listSnapshots Session m h
sesh = do
Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh) LSMTreeTrace
TraceListSnapshots
Session m h
-> (SessionEnv m h -> m [SnapshotName]) -> m [SnapshotName]
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m [SnapshotName]) -> m [SnapshotName])
-> (SessionEnv m h -> m [SnapshotName]) -> m [SnapshotName]
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
let hfs :: HasFS m h
hfs = SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv
root :: SessionRoot
root = SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv
Set String
contents <- HasFS m h -> HasCallStack => FsPath -> m (Set String)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m (Set String)
FS.listDirectory HasFS m h
hfs (SessionRoot -> FsPath
Paths.snapshotsDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv))
[Maybe SnapshotName]
snaps <- (String -> m (Maybe SnapshotName))
-> [String] -> m [Maybe SnapshotName]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (HasFS m h -> SessionRoot -> String -> m (Maybe SnapshotName)
forall {m :: * -> *} {h}.
Monad m =>
HasFS m h -> SessionRoot -> String -> m (Maybe SnapshotName)
checkSnapshot HasFS m h
hfs SessionRoot
root) ([String] -> m [Maybe SnapshotName])
-> [String] -> m [Maybe SnapshotName]
forall a b. (a -> b) -> a -> b
$ Set String -> [String]
forall a. Set a -> [a]
Set.toList Set String
contents
[SnapshotName] -> m [SnapshotName]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([SnapshotName] -> m [SnapshotName])
-> [SnapshotName] -> m [SnapshotName]
forall a b. (a -> b) -> a -> b
$ [Maybe SnapshotName] -> [SnapshotName]
forall a. [Maybe a] -> [a]
catMaybes [Maybe SnapshotName]
snaps
where
checkSnapshot :: HasFS m h -> SessionRoot -> String -> m (Maybe SnapshotName)
checkSnapshot HasFS m h
hfs SessionRoot
root String
s = do
let snap :: SnapshotName
snap = String -> SnapshotName
Paths.toSnapshotName String
s
Bool
b <- HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs
(NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir (NamedSnapshotDir -> FsPath) -> NamedSnapshotDir -> FsPath
forall a b. (a -> b) -> a -> b
$ SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir SessionRoot
root SnapshotName
snap)
if Bool
b then Maybe SnapshotName -> m (Maybe SnapshotName)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe SnapshotName -> m (Maybe SnapshotName))
-> Maybe SnapshotName -> m (Maybe SnapshotName)
forall a b. (a -> b) -> a -> b
$ SnapshotName -> Maybe SnapshotName
forall a. a -> Maybe a
Just SnapshotName
snap
else Maybe SnapshotName -> m (Maybe SnapshotName)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe SnapshotName -> m (Maybe SnapshotName))
-> Maybe SnapshotName -> m (Maybe SnapshotName)
forall a b. (a -> b) -> a -> b
$ Maybe SnapshotName
forall a. Maybe a
Nothing
{-# SPECIALISE duplicate :: Table IO h -> IO (Table IO h) #-}
duplicate ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> Table m h
-> m (Table m h)
duplicate :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Table m h -> m (Table m h)
duplicate t :: Table m h
t@Table{Tracer m TableTrace
RWVar m (TableState m h)
ArenaManager (PrimState m)
TableId
TableConfig
Session m h
tableConfig :: forall (m :: * -> *) h. Table m h -> TableConfig
tableState :: forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableArenaManager :: forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableTracer :: forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableId :: forall (m :: * -> *) h. Table m h -> TableId
tableSession :: forall (m :: * -> *) h. Table m h -> Session m h
tableConfig :: TableConfig
tableState :: RWVar m (TableState m h)
tableArenaManager :: ArenaManager (PrimState m)
tableTracer :: Tracer m TableTrace
tableId :: TableId
tableSession :: Session m h
..} = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m TableTrace
tableTracer TableTrace
TraceDuplicate
Table m h -> (TableEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m (Table m h)) -> m (Table m h))
-> (TableEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \TableEnv{RWVar m (TableContent m h)
SessionEnv m h
tableSessionEnv :: forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableContent :: forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableSessionEnv :: SessionEnv m h
tableContent :: RWVar m (TableContent m h)
..} -> do
Session m h -> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
tableSession ((SessionEnv m h -> m (Table m h)) -> m (Table m h))
-> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
_ -> do
(ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Table m h)) -> m (Table m h))
-> (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
TableContent m h
content <- RWVar m (TableContent m h)
-> (TableContent m h -> m (TableContent m h))
-> m (TableContent m h)
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess RWVar m (TableContent m h)
tableContent (ActionRegistry m -> TableContent m h -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m (TableContent m h)
duplicateTableContent ActionRegistry m
reg)
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith
ActionRegistry m
reg
Session m h
tableSession
SessionEnv m h
tableSessionEnv
TableConfig
tableConfig
ArenaManager (PrimState m)
tableArenaManager
TableContent m h
content
data TableUnionNotCompatibleError
= ErrTableUnionHandleTypeMismatch
!Int
!TypeRep
!Int
!TypeRep
| ErrTableUnionSessionMismatch
!Int
!FsErrorPath
!Int
!FsErrorPath
deriving stock (Int -> TableUnionNotCompatibleError -> ShowS
[TableUnionNotCompatibleError] -> ShowS
TableUnionNotCompatibleError -> String
(Int -> TableUnionNotCompatibleError -> ShowS)
-> (TableUnionNotCompatibleError -> String)
-> ([TableUnionNotCompatibleError] -> ShowS)
-> Show TableUnionNotCompatibleError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TableUnionNotCompatibleError -> ShowS
showsPrec :: Int -> TableUnionNotCompatibleError -> ShowS
$cshow :: TableUnionNotCompatibleError -> String
show :: TableUnionNotCompatibleError -> String
$cshowList :: [TableUnionNotCompatibleError] -> ShowS
showList :: [TableUnionNotCompatibleError] -> ShowS
Show, TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
(TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool)
-> (TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool)
-> Eq TableUnionNotCompatibleError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
== :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
$c/= :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
/= :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
Eq)
deriving anyclass (Show TableUnionNotCompatibleError
Typeable TableUnionNotCompatibleError
(Typeable TableUnionNotCompatibleError,
Show TableUnionNotCompatibleError) =>
(TableUnionNotCompatibleError -> SomeException)
-> (SomeException -> Maybe TableUnionNotCompatibleError)
-> (TableUnionNotCompatibleError -> String)
-> Exception TableUnionNotCompatibleError
SomeException -> Maybe TableUnionNotCompatibleError
TableUnionNotCompatibleError -> String
TableUnionNotCompatibleError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: TableUnionNotCompatibleError -> SomeException
toException :: TableUnionNotCompatibleError -> SomeException
$cfromException :: SomeException -> Maybe TableUnionNotCompatibleError
fromException :: SomeException -> Maybe TableUnionNotCompatibleError
$cdisplayException :: TableUnionNotCompatibleError -> String
displayException :: TableUnionNotCompatibleError -> String
Exception)
{-# SPECIALISE unions :: NonEmpty (Table IO h) -> IO (Table IO h) #-}
unions ::
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> NonEmpty (Table m h)
-> m (Table m h)
unions :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
NonEmpty (Table m h) -> m (Table m h)
unions NonEmpty (Table m h)
ts = do
Session m h
sesh <- NonEmpty (Table m h) -> m (Session m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m) =>
NonEmpty (Table m h) -> m (Session m h)
ensureSessionsMatch NonEmpty (Table m h)
ts
Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh) (LSMTreeTrace -> m ()) -> LSMTreeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ NonEmpty TableId -> LSMTreeTrace
TraceUnions ((Table m h -> TableId) -> NonEmpty (Table m h) -> NonEmpty TableId
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
NE.map Table m h -> TableId
forall (m :: * -> *) h. Table m h -> TableId
tableId NonEmpty (Table m h)
ts)
let conf :: TableConfig
conf = Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig (NonEmpty (Table m h) -> Table m h
forall a. NonEmpty a -> a
NE.last NonEmpty (Table m h)
ts)
m (SessionState m h)
-> (SessionState m h -> m ())
-> (ActionRegistry m
-> SessionState m h -> m (SessionState m h, Table m h))
-> m (Table m h)
forall (m :: * -> *) st a.
(PrimMonad m, MonadCatch m) =>
m st
-> (st -> m ()) -> (ActionRegistry m -> st -> m (st, a)) -> m a
modifyWithActionRegistry
(STM m (SessionState m h) -> m (SessionState m h)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (SessionState m h) -> m (SessionState m h))
-> STM m (SessionState m h) -> m (SessionState m h)
forall a b. (a -> b) -> a -> b
$ RWVar m (SessionState m h) -> STM m (SessionState m h)
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> STM m a
RW.unsafeAcquireReadAccess (Session m h -> RWVar m (SessionState m h)
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState Session m h
sesh))
(\SessionState m h
_ -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ RWVar m (SessionState m h) -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> STM m ()
RW.unsafeReleaseReadAccess (Session m h -> RWVar m (SessionState m h)
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState Session m h
sesh)) ((ActionRegistry m
-> SessionState m h -> m (SessionState m h, Table m h))
-> m (Table m h))
-> (ActionRegistry m
-> SessionState m h -> m (SessionState m h, Table m h))
-> m (Table m h)
forall a b. (a -> b) -> a -> b
$
\ActionRegistry m
reg -> \case
SessionState m h
SessionClosed -> SessionClosedError -> m (SessionState m h, Table m h)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SessionClosedError
ErrSessionClosed
seshState :: SessionState m h
seshState@(SessionOpen SessionEnv m h
seshEnv) -> do
Table m h
t <- ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> NonEmpty (Table m h)
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, MonadMVar m, MonadST m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> NonEmpty (Table m h)
-> m (Table m h)
unionsInOpenSession ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf NonEmpty (Table m h)
ts
(SessionState m h, Table m h) -> m (SessionState m h, Table m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SessionState m h
seshState, Table m h
t)
{-# SPECIALISE unionsInOpenSession ::
ActionRegistry IO
-> Session IO h
-> SessionEnv IO h
-> TableConfig
-> NonEmpty (Table IO h)
-> IO (Table IO h) #-}
unionsInOpenSession ::
(MonadSTM m, MonadMask m, MonadMVar m, MonadST m)
=> ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> NonEmpty (Table m h)
-> m (Table m h)
unionsInOpenSession :: forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, MonadMVar m, MonadST m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> NonEmpty (Table m h)
-> m (Table m h)
unionsInOpenSession ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf NonEmpty (Table m h)
ts = do
[Ref (MergingTree m h)]
mts <- [Table m h]
-> (Table m h -> m (Ref (MergingTree m h)))
-> m [Ref (MergingTree m h)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (NonEmpty (Table m h) -> [Table m h]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (Table m h)
ts) ((Table m h -> m (Ref (MergingTree m h)))
-> m [Ref (MergingTree m h)])
-> (Table m h -> m (Ref (MergingTree m h)))
-> m [Ref (MergingTree m h)]
forall a b. (a -> b) -> a -> b
$ \Table m h
t ->
Table m h
-> (TableEnv m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h)))
-> (TableEnv m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv ->
RWVar m (TableContent m h)
-> (TableContent m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h)))
-> (TableContent m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tc ->
ActionRegistry m
-> m (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> m ())
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
(SessionEnv m h
-> TableConfig -> TableContent m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
SessionEnv m h
-> TableConfig -> TableContent m h -> m (Ref (MergingTree m h))
tableContentToMergingTree SessionEnv m h
seshEnv TableConfig
conf TableContent m h
tc)
Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
Ref (MergingTree m h)
mt <- ActionRegistry m
-> m (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> m ())
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg ([Ref (MergingTree m h)] -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, PrimMonad m) =>
[Ref (MergingTree m h)] -> m (Ref (MergingTree m h))
newPendingUnionMerge [Ref (MergingTree m h)]
mts) Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
[Ref (MergingTree m h)] -> (Ref (MergingTree m h) -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Ref (MergingTree m h)]
mts (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ())
-> (Ref (MergingTree m h) -> m ()) -> Ref (MergingTree m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef)
TableContent m h
empty <- SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m, MonadMVar m) =>
SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
newEmptyTableContent SessionEnv m h
seshEnv ActionRegistry m
reg
let content :: TableContent m h
content = TableContent m h
empty { tableUnionLevel = Union mt }
am :: ArenaManager (PrimState m)
am = Table m h -> ArenaManager (PrimState m)
forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager (NonEmpty (Table m h) -> Table m h
forall a. NonEmpty a -> a
NE.last NonEmpty (Table m h)
ts)
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf ArenaManager (PrimState m)
am TableContent m h
content
{-# SPECIALISE tableContentToMergingTree ::
SessionEnv IO h
-> TableConfig
-> TableContent IO h
-> IO (Ref (MergingTree IO h)) #-}
tableContentToMergingTree ::
forall m h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
=> SessionEnv m h
-> TableConfig
-> TableContent m h
-> m (Ref (MergingTree m h))
tableContentToMergingTree :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
SessionEnv m h
-> TableConfig -> TableContent m h -> m (Ref (MergingTree m h))
tableContentToMergingTree SessionEnv m h
seshEnv TableConfig
conf
tc :: TableContent m h
tc@TableContent {
Levels m h
tableLevels :: forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels :: Levels m h
tableLevels,
UnionLevel m h
tableUnionLevel :: forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel :: UnionLevel m h
tableUnionLevel
} =
m (Maybe (Ref (Run m h)))
-> (Maybe (Ref (Run m h)) -> m ())
-> (Maybe (Ref (Run m h)) -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (SessionEnv m h
-> TableConfig -> TableContent m h -> m (Maybe (Ref (Run m h)))
forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
SessionEnv m h
-> TableConfig -> TableContent m h -> m (Maybe (Ref (Run m h)))
writeBufferToNewRun SessionEnv m h
seshEnv TableConfig
conf TableContent m h
tc)
((Ref (Run m h) -> m ()) -> Maybe (Ref (Run m h)) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) ((Maybe (Ref (Run m h)) -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h)))
-> (Maybe (Ref (Run m h)) -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \Maybe (Ref (Run m h))
mwbr ->
let runs :: [PreExistingRun m h]
runs :: [PreExistingRun m h]
runs = Maybe (PreExistingRun m h) -> [PreExistingRun m h]
forall a. Maybe a -> [a]
maybeToList ((Ref (Run m h) -> PreExistingRun m h)
-> Maybe (Ref (Run m h)) -> Maybe (PreExistingRun m h)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun Maybe (Ref (Run m h))
mwbr)
[PreExistingRun m h]
-> [PreExistingRun m h] -> [PreExistingRun m h]
forall a. [a] -> [a] -> [a]
++ (Level m h -> [PreExistingRun m h])
-> [Level m h] -> [PreExistingRun m h]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap Level m h -> [PreExistingRun m h]
levelToPreExistingRuns (Levels m h -> [Level m h]
forall a. Vector a -> [a]
V.toList Levels m h
tableLevels)
unionmt :: Maybe (Ref (MergingTree m h))
unionmt = case UnionLevel m h
tableUnionLevel of
UnionLevel m h
NoUnion -> Maybe (Ref (MergingTree m h))
forall a. Maybe a
Nothing
Union Ref (MergingTree m h)
mt -> Ref (MergingTree m h) -> Maybe (Ref (MergingTree m h))
forall a. a -> Maybe a
Just Ref (MergingTree m h)
mt
in [PreExistingRun m h]
-> Maybe (Ref (MergingTree m h)) -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, PrimMonad m) =>
[PreExistingRun m h]
-> Maybe (Ref (MergingTree m h)) -> m (Ref (MergingTree m h))
newPendingLevelMerge [PreExistingRun m h]
runs Maybe (Ref (MergingTree m h))
unionmt
where
levelToPreExistingRuns :: Level m h -> [PreExistingRun m h]
levelToPreExistingRuns :: Level m h -> [PreExistingRun m h]
levelToPreExistingRuns Level{IncomingRun m h
incomingRun :: IncomingRun m h
incomingRun :: forall (m :: * -> *) h. Level m h -> IncomingRun m h
incomingRun, Vector (Ref (Run m h))
residentRuns :: Vector (Ref (Run m h))
residentRuns :: forall (m :: * -> *) h. Level m h -> Vector (Ref (Run m h))
residentRuns} =
case IncomingRun m h
incomingRun of
Single Ref (Run m h)
r -> Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun Ref (Run m h)
r
Merging MergePolicyForLevel
_ NominalDebt
_ PrimVar (PrimState m) NominalCredits
_ Ref (MergingRun LevelMergeType m h)
mr -> Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h
forall (m :: * -> *) h.
Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h
PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr
PreExistingRun m h -> [PreExistingRun m h] -> [PreExistingRun m h]
forall a. a -> [a] -> [a]
: (Ref (Run m h) -> PreExistingRun m h)
-> [Ref (Run m h)] -> [PreExistingRun m h]
forall a b. (a -> b) -> [a] -> [b]
map Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun (Vector (Ref (Run m h)) -> [Ref (Run m h)]
forall a. Vector a -> [a]
V.toList Vector (Ref (Run m h))
residentRuns)
{-# SPECIALISE writeBufferToNewRun ::
SessionEnv IO h
-> TableConfig
-> TableContent IO h
-> IO (Maybe (Ref (Run IO h))) #-}
writeBufferToNewRun ::
(MonadMask m, MonadST m, MonadSTM m)
=> SessionEnv m h
-> TableConfig
-> TableContent m h
-> m (Maybe (Ref (Run m h)))
writeBufferToNewRun :: forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
SessionEnv m h
-> TableConfig -> TableContent m h -> m (Maybe (Ref (Run m h)))
writeBufferToNewRun SessionEnv {
sessionRoot :: forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot = SessionRoot
root,
sessionHasFS :: forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS = HasFS m h
hfs,
sessionHasBlockIO :: forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO = HasBlockIO m h
hbio,
sessionUniqCounter :: forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter = UniqCounter m
uc
}
TableConfig
conf
TableContent{
WriteBuffer
tableWriteBuffer :: forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer,
Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
}
| WriteBuffer -> Bool
WB.null WriteBuffer
tableWriteBuffer = Maybe (Ref (Run m h)) -> m (Maybe (Ref (Run m h)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Ref (Run m h))
forall a. Maybe a
Nothing
| Bool
otherwise = Ref (Run m h) -> Maybe (Ref (Run m h))
forall a. a -> Maybe a
Just (Ref (Run m h) -> Maybe (Ref (Run m h)))
-> m (Ref (Run m h)) -> m (Maybe (Ref (Run m h)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> do
!Unique
uniq <- UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter UniqCounter m
uc
let (!RunParams
runParams, !RunFsPaths
runPaths) = ActiveDir
-> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
mergingRunParamsForLevel
(SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root) TableConfig
conf Unique
uniq (Int -> LevelNo
LevelNo Int
1)
HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
Run.fromWriteBuffer
HasFS m h
hfs HasBlockIO m h
hbio
RunParams
runParams RunFsPaths
runPaths
WriteBuffer
tableWriteBuffer
Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
{-# SPECIALISE ensureSessionsMatch ::
NonEmpty (Table IO h)
-> IO (Session IO h) #-}
ensureSessionsMatch ::
(MonadSTM m, MonadThrow m)
=> NonEmpty (Table m h)
-> m (Session m h)
ensureSessionsMatch :: forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m) =>
NonEmpty (Table m h) -> m (Session m h)
ensureSessionsMatch (Table m h
t :| [Table m h]
ts) = do
let sesh :: Session m h
sesh = Table m h -> Session m h
forall (m :: * -> *) h. Table m h -> Session m h
tableSession Table m h
t
Session m h
-> (SessionEnv m h -> m (Session m h)) -> m (Session m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m (Session m h)) -> m (Session m h))
-> (SessionEnv m h -> m (Session m h)) -> m (Session m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
let root :: FsErrorPath
root = HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) (SessionRoot -> FsPath
getSessionRoot (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv))
[(Int, Table m h)] -> ((Int, Table m h) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ ([Int] -> [Table m h] -> [(Int, Table m h)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] [Table m h]
ts) (((Int, Table m h) -> m ()) -> m ())
-> ((Int, Table m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(Int
i, Table m h
t') -> do
let sesh' :: Session m h
sesh' = Table m h -> Session m h
forall (m :: * -> *) h. Table m h -> Session m h
tableSession Table m h
t'
Session m h -> (SessionEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh' ((SessionEnv m h -> m ()) -> m ())
-> (SessionEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv' -> do
let root' :: FsErrorPath
root' = HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv') (SessionRoot -> FsPath
getSessionRoot (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv'))
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (FsErrorPath
root FsErrorPath -> FsErrorPath -> Bool
forall a. Eq a => a -> a -> Bool
== FsErrorPath
root') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ TableUnionNotCompatibleError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (TableUnionNotCompatibleError -> m ())
-> TableUnionNotCompatibleError -> m ()
forall a b. (a -> b) -> a -> b
$ Int
-> FsErrorPath
-> Int
-> FsErrorPath
-> TableUnionNotCompatibleError
ErrTableUnionSessionMismatch Int
0 FsErrorPath
root Int
i FsErrorPath
root'
Session m h -> m (Session m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Session m h
sesh
newtype UnionDebt = UnionDebt Int
deriving newtype (Int -> UnionDebt -> ShowS
[UnionDebt] -> ShowS
UnionDebt -> String
(Int -> UnionDebt -> ShowS)
-> (UnionDebt -> String)
-> ([UnionDebt] -> ShowS)
-> Show UnionDebt
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnionDebt -> ShowS
showsPrec :: Int -> UnionDebt -> ShowS
$cshow :: UnionDebt -> String
show :: UnionDebt -> String
$cshowList :: [UnionDebt] -> ShowS
showList :: [UnionDebt] -> ShowS
Show, UnionDebt -> UnionDebt -> Bool
(UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool) -> Eq UnionDebt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnionDebt -> UnionDebt -> Bool
== :: UnionDebt -> UnionDebt -> Bool
$c/= :: UnionDebt -> UnionDebt -> Bool
/= :: UnionDebt -> UnionDebt -> Bool
Eq, Eq UnionDebt
Eq UnionDebt =>
(UnionDebt -> UnionDebt -> Ordering)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> Ord UnionDebt
UnionDebt -> UnionDebt -> Bool
UnionDebt -> UnionDebt -> Ordering
UnionDebt -> UnionDebt -> UnionDebt
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: UnionDebt -> UnionDebt -> Ordering
compare :: UnionDebt -> UnionDebt -> Ordering
$c< :: UnionDebt -> UnionDebt -> Bool
< :: UnionDebt -> UnionDebt -> Bool
$c<= :: UnionDebt -> UnionDebt -> Bool
<= :: UnionDebt -> UnionDebt -> Bool
$c> :: UnionDebt -> UnionDebt -> Bool
> :: UnionDebt -> UnionDebt -> Bool
$c>= :: UnionDebt -> UnionDebt -> Bool
>= :: UnionDebt -> UnionDebt -> Bool
$cmax :: UnionDebt -> UnionDebt -> UnionDebt
max :: UnionDebt -> UnionDebt -> UnionDebt
$cmin :: UnionDebt -> UnionDebt -> UnionDebt
min :: UnionDebt -> UnionDebt -> UnionDebt
Ord, Integer -> UnionDebt
UnionDebt -> UnionDebt
UnionDebt -> UnionDebt -> UnionDebt
(UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (Integer -> UnionDebt)
-> Num UnionDebt
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: UnionDebt -> UnionDebt -> UnionDebt
+ :: UnionDebt -> UnionDebt -> UnionDebt
$c- :: UnionDebt -> UnionDebt -> UnionDebt
- :: UnionDebt -> UnionDebt -> UnionDebt
$c* :: UnionDebt -> UnionDebt -> UnionDebt
* :: UnionDebt -> UnionDebt -> UnionDebt
$cnegate :: UnionDebt -> UnionDebt
negate :: UnionDebt -> UnionDebt
$cabs :: UnionDebt -> UnionDebt
abs :: UnionDebt -> UnionDebt
$csignum :: UnionDebt -> UnionDebt
signum :: UnionDebt -> UnionDebt
$cfromInteger :: Integer -> UnionDebt
fromInteger :: Integer -> UnionDebt
Num)
{-# SPECIALISE remainingUnionDebt :: Table IO h -> IO UnionDebt #-}
remainingUnionDebt ::
(MonadSTM m, MonadMVar m, MonadThrow m, PrimMonad m)
=> Table m h -> m UnionDebt
remainingUnionDebt :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, MonadThrow m, PrimMonad m) =>
Table m h -> m UnionDebt
remainingUnionDebt Table m h
t = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) TableTrace
TraceRemainingUnionDebt
Table m h -> (TableEnv m h -> m UnionDebt) -> m UnionDebt
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m UnionDebt) -> m UnionDebt)
-> (TableEnv m h -> m UnionDebt) -> m UnionDebt
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
RWVar m (TableContent m h)
-> (TableContent m h -> m UnionDebt) -> m UnionDebt
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m UnionDebt) -> m UnionDebt)
-> (TableContent m h -> m UnionDebt) -> m UnionDebt
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tableContent -> do
case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tableContent of
UnionLevel m h
NoUnion ->
UnionDebt -> m UnionDebt
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> UnionDebt
UnionDebt Int
0)
Union Ref (MergingTree m h)
mt -> do
(MergeDebt (MergeCredits Int
c), NumEntries
_) <- Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
MT.remainingMergeDebt Ref (MergingTree m h)
mt
UnionDebt -> m UnionDebt
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> UnionDebt
UnionDebt Int
c)
newtype UnionCredits = UnionCredits Int
deriving newtype (Int -> UnionCredits -> ShowS
[UnionCredits] -> ShowS
UnionCredits -> String
(Int -> UnionCredits -> ShowS)
-> (UnionCredits -> String)
-> ([UnionCredits] -> ShowS)
-> Show UnionCredits
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnionCredits -> ShowS
showsPrec :: Int -> UnionCredits -> ShowS
$cshow :: UnionCredits -> String
show :: UnionCredits -> String
$cshowList :: [UnionCredits] -> ShowS
showList :: [UnionCredits] -> ShowS
Show, UnionCredits -> UnionCredits -> Bool
(UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool) -> Eq UnionCredits
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnionCredits -> UnionCredits -> Bool
== :: UnionCredits -> UnionCredits -> Bool
$c/= :: UnionCredits -> UnionCredits -> Bool
/= :: UnionCredits -> UnionCredits -> Bool
Eq, Eq UnionCredits
Eq UnionCredits =>
(UnionCredits -> UnionCredits -> Ordering)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> Ord UnionCredits
UnionCredits -> UnionCredits -> Bool
UnionCredits -> UnionCredits -> Ordering
UnionCredits -> UnionCredits -> UnionCredits
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: UnionCredits -> UnionCredits -> Ordering
compare :: UnionCredits -> UnionCredits -> Ordering
$c< :: UnionCredits -> UnionCredits -> Bool
< :: UnionCredits -> UnionCredits -> Bool
$c<= :: UnionCredits -> UnionCredits -> Bool
<= :: UnionCredits -> UnionCredits -> Bool
$c> :: UnionCredits -> UnionCredits -> Bool
> :: UnionCredits -> UnionCredits -> Bool
$c>= :: UnionCredits -> UnionCredits -> Bool
>= :: UnionCredits -> UnionCredits -> Bool
$cmax :: UnionCredits -> UnionCredits -> UnionCredits
max :: UnionCredits -> UnionCredits -> UnionCredits
$cmin :: UnionCredits -> UnionCredits -> UnionCredits
min :: UnionCredits -> UnionCredits -> UnionCredits
Ord, Integer -> UnionCredits
UnionCredits -> UnionCredits
UnionCredits -> UnionCredits -> UnionCredits
(UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (Integer -> UnionCredits)
-> Num UnionCredits
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: UnionCredits -> UnionCredits -> UnionCredits
+ :: UnionCredits -> UnionCredits -> UnionCredits
$c- :: UnionCredits -> UnionCredits -> UnionCredits
- :: UnionCredits -> UnionCredits -> UnionCredits
$c* :: UnionCredits -> UnionCredits -> UnionCredits
* :: UnionCredits -> UnionCredits -> UnionCredits
$cnegate :: UnionCredits -> UnionCredits
negate :: UnionCredits -> UnionCredits
$cabs :: UnionCredits -> UnionCredits
abs :: UnionCredits -> UnionCredits
$csignum :: UnionCredits -> UnionCredits
signum :: UnionCredits -> UnionCredits
$cfromInteger :: Integer -> UnionCredits
fromInteger :: Integer -> UnionCredits
Num)
{-# SPECIALISE supplyUnionCredits ::
ResolveSerialisedValue -> Table IO h -> UnionCredits -> IO UnionCredits #-}
supplyUnionCredits ::
(MonadST m, MonadSTM m, MonadMVar m, MonadMask m)
=> ResolveSerialisedValue -> Table m h -> UnionCredits -> m UnionCredits
supplyUnionCredits :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMVar m, MonadMask m) =>
ResolveSerialisedValue
-> Table m h -> UnionCredits -> m UnionCredits
supplyUnionCredits ResolveSerialisedValue
resolve Table m h
t UnionCredits
credits = do
Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ UnionCredits -> TableTrace
TraceSupplyUnionCredits UnionCredits
credits
Table m h -> (TableEnv m h -> m UnionCredits) -> m UnionCredits
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m UnionCredits) -> m UnionCredits)
-> (TableEnv m h -> m UnionCredits) -> m UnionCredits
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
RWVar m (TableContent m h)
-> (TableContent m h -> m UnionCredits) -> m UnionCredits
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m UnionCredits) -> m UnionCredits)
-> (TableContent m h -> m UnionCredits) -> m UnionCredits
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tableContent -> do
case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tableContent of
UnionLevel m h
NoUnion ->
UnionCredits -> m UnionCredits
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UnionCredits -> UnionCredits -> UnionCredits
forall a. Ord a => a -> a -> a
max UnionCredits
0 UnionCredits
credits)
Union Ref (MergingTree m h)
mt -> do
let conf :: TableConfig
conf = Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig Table m h
t
let AllocNumEntries (NumEntries Int
x) = TableConfig -> WriteBufferAlloc
confWriteBufferAlloc TableConfig
conf
let thresh :: CreditThreshold
thresh = UnspentCredits -> CreditThreshold
MR.CreditThreshold (MergeCredits -> UnspentCredits
MR.UnspentCredits (Int -> MergeCredits
MergeCredits Int
x))
MergeCredits Int
leftovers <-
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> CreditThreshold
-> SessionRoot
-> UniqCounter m
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
forall (m :: * -> *) h.
(MonadMVar m, MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> CreditThreshold
-> SessionRoot
-> UniqCounter m
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
MT.supplyCredits
(TableEnv m h -> HasFS m h
forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS TableEnv m h
tEnv)
(TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
ResolveSerialisedValue
resolve
(TableConfig -> RunLevelNo -> RunParams
runParamsForLevel TableConfig
conf RunLevelNo
UnionLevel)
CreditThreshold
thresh
(TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv)
(TableEnv m h -> UniqCounter m
forall (m :: * -> *) h. TableEnv m h -> UniqCounter m
tableSessionUniqCounter TableEnv m h
tEnv)
Ref (MergingTree m h)
mt
(let UnionCredits Int
c = UnionCredits
credits in Int -> MergeCredits
MergeCredits Int
c)
UnionCredits -> m UnionCredits
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> UnionCredits
UnionCredits Int
leftovers)