{-# LANGUAGE DataKinds #-}

-- | This module brings together the internal parts to provide an API in terms
-- of untyped serialised keys, values and blobs. It makes no distinction between
-- normal and monoidal tables, accepting both blobs and mupserts.
-- The typed [normal]("Database.LSMTree.Normal") and
-- [monoidal]("Database.LSMTree.Monoidal") APIs then provide more type-safe
-- wrappers and handle serialisation.
--
-- Apart from defining the API, this module mainly deals with concurrency- and
-- exception-safe opening and closing of resources. Any other non-trivial logic
-- should live somewhere else.
--
module Database.LSMTree.Internal (
    -- * Existentials
    Session' (..)
  , Table' (..)
  , Cursor' (..)
  , NormalTable (..)
  , NormalCursor (..)
  , MonoidalTable (..)
  , MonoidalCursor (..)
    -- * Exceptions
  , SessionDirDoesNotExistError (..)
  , SessionDirLockedError (..)
  , SessionDirCorruptedError (..)
  , SessionClosedError (..)
  , TableClosedError (..)
  , TableCorruptedError (..)
  , TableTooLargeError (..)
  , TableUnionNotCompatibleError (..)
  , SnapshotExistsError (..)
  , SnapshotDoesNotExistError (..)
  , SnapshotCorruptedError (..)
  , SnapshotNotCompatibleError (..)
  , BlobRefInvalidError (..)
  , CursorClosedError (..)
  , FileFormat (..)
  , FileCorruptedError (..)
  , Paths.InvalidSnapshotNameError (..)
    -- * Tracing
  , LSMTreeTrace (..)
  , TableTrace (..)
    -- * Session
  , Session (..)
  , SessionState (..)
  , SessionEnv (..)
  , withOpenSession
    -- ** Implementation of public API
  , withSession
  , openSession
  , closeSession
    -- * Table
  , Table (..)
  , TableState (..)
  , TableEnv (..)
  , withOpenTable
    -- ** Implementation of public API
  , ResolveSerialisedValue
  , withTable
  , new
  , close
  , lookups
  , rangeLookup
  , updates
  , retrieveBlobs
    -- ** Cursor API
  , Cursor (..)
  , CursorState (..)
  , CursorEnv (..)
  , OffsetKey (..)
  , withCursor
  , newCursor
  , closeCursor
  , readCursor
  , readCursorWhile
    -- * Snapshots
  , SnapshotLabel
  , createSnapshot
  , openSnapshot
  , deleteSnapshot
  , doesSnapshotExist
  , listSnapshots
    -- * Multiple writable tables
  , duplicate
    -- * Table union
  , unions
  , UnionDebt (..)
  , remainingUnionDebt
  , UnionCredits (..)
  , supplyUnionCredits
  ) where

import           Control.ActionRegistry
import           Control.Concurrent.Class.MonadMVar.Strict
import           Control.Concurrent.Class.MonadSTM (MonadSTM (..))
import           Control.Concurrent.Class.MonadSTM.RWVar (RWVar)
import qualified Control.Concurrent.Class.MonadSTM.RWVar as RW
import           Control.DeepSeq
import           Control.Monad (forM, unless, void)
import           Control.Monad.Class.MonadAsync as Async
import           Control.Monad.Class.MonadST (MonadST (..))
import           Control.Monad.Class.MonadThrow
import           Control.Monad.Primitive
import           Control.RefCount
import           Control.Tracer
import           Data.Either (fromRight)
import           Data.Foldable
import           Data.Kind
import           Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NE
import           Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import           Data.Maybe (catMaybes, fromMaybe, maybeToList)
import           Data.Monoid (First (..))
import qualified Data.Set as Set
import           Data.Typeable
import qualified Data.Vector as V
import           Database.LSMTree.Internal.Arena (ArenaManager, newArenaManager)
import           Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
import           Database.LSMTree.Internal.Config
import           Database.LSMTree.Internal.CRC32C (FileCorruptedError (..),
                     FileFormat (..))
import qualified Database.LSMTree.Internal.Cursor as Cursor
import           Database.LSMTree.Internal.Entry (Entry, NumEntries (..))
import           Database.LSMTree.Internal.IncomingRun (IncomingRun (..))
import           Database.LSMTree.Internal.Lookup (ResolveSerialisedValue,
                     TableCorruptedError (..), lookupsIO,
                     lookupsIOWithWriteBuffer)
import           Database.LSMTree.Internal.MergeSchedule
import           Database.LSMTree.Internal.MergingRun (TableTooLargeError (..))
import qualified Database.LSMTree.Internal.MergingRun as MR
import           Database.LSMTree.Internal.MergingTree
import qualified Database.LSMTree.Internal.MergingTree as MT
import qualified Database.LSMTree.Internal.MergingTree.Lookup as MT
import           Database.LSMTree.Internal.Paths (SessionRoot (..),
                     SnapshotMetaDataChecksumFile (..),
                     SnapshotMetaDataFile (..), SnapshotName)
import qualified Database.LSMTree.Internal.Paths as Paths
import           Database.LSMTree.Internal.Range (Range (..))
import           Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import           Database.LSMTree.Internal.RunNumber
import           Database.LSMTree.Internal.RunReaders (OffsetKey (..))
import qualified Database.LSMTree.Internal.RunReaders as Readers
import           Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
                     SerialisedKey, SerialisedValue)
import           Database.LSMTree.Internal.Snapshot
import           Database.LSMTree.Internal.Snapshot.Codec
import           Database.LSMTree.Internal.UniqCounter
import qualified Database.LSMTree.Internal.Vector as V
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import qualified System.FS.API as FS
import           System.FS.API (FsError, FsErrorPath (..), FsPath, HasFS)
import qualified System.FS.BlockIO.API as FS
import           System.FS.BlockIO.API (HasBlockIO)

{-------------------------------------------------------------------------------
  Existentials
-------------------------------------------------------------------------------}

type Session' :: (Type -> Type) -> Type
data Session' m = forall h. Typeable h => Session' !(Session m h)

instance NFData (Session' m) where
  rnf :: Session' m -> ()
rnf (Session' Session m h
s) = Session m h -> ()
forall a. NFData a => a -> ()
rnf Session m h
s

type Table' :: (Type -> Type) -> Type -> Type -> Type -> Type
data Table' m k v b = forall h. Typeable h => Table' (Table m h)

instance NFData (Table' m k v b) where
  rnf :: Table' m k v b -> ()
rnf (Table' Table m h
t) = Table m h -> ()
forall a. NFData a => a -> ()
rnf Table m h
t

type Cursor' :: (Type -> Type) -> Type -> Type -> Type -> Type
data Cursor' m k v b = forall h. Typeable h => Cursor' (Cursor m h)

instance NFData (Cursor' m k v b) where
  rnf :: Cursor' m k v b -> ()
rnf (Cursor' Cursor m h
t) = Cursor m h -> ()
forall a. NFData a => a -> ()
rnf Cursor m h
t

type NormalTable :: (Type -> Type) -> Type -> Type -> Type -> Type
data NormalTable m k v b = forall h. Typeable h =>
    NormalTable !(Table m h)

instance NFData (NormalTable m k v b) where
  rnf :: NormalTable m k v b -> ()
rnf (NormalTable Table m h
t) = Table m h -> ()
forall a. NFData a => a -> ()
rnf Table m h
t

type NormalCursor :: (Type -> Type) -> Type -> Type -> Type -> Type
data NormalCursor m k v b = forall h. Typeable h =>
    NormalCursor !(Cursor m h)

instance NFData (NormalCursor m k v b) where
  rnf :: NormalCursor m k v b -> ()
rnf (NormalCursor Cursor m h
c) = Cursor m h -> ()
forall a. NFData a => a -> ()
rnf Cursor m h
c

type MonoidalTable :: (Type -> Type) -> Type -> Type -> Type
data MonoidalTable m k v = forall h. Typeable h =>
    MonoidalTable !(Table m h)

instance NFData (MonoidalTable m k v) where
  rnf :: MonoidalTable m k v -> ()
rnf (MonoidalTable Table m h
t) = Table m h -> ()
forall a. NFData a => a -> ()
rnf Table m h
t

type MonoidalCursor :: (Type -> Type) -> Type -> Type -> Type
data MonoidalCursor m k v = forall h. Typeable h =>
    MonoidalCursor !(Cursor m h)

instance NFData (MonoidalCursor m k v) where
  rnf :: MonoidalCursor m k v -> ()
rnf (MonoidalCursor Cursor m h
c) = Cursor m h -> ()
forall a. NFData a => a -> ()
rnf Cursor m h
c

{-------------------------------------------------------------------------------
  Traces
-------------------------------------------------------------------------------}

data LSMTreeTrace =
    -- Session
    TraceOpenSession FsPath
  | TraceNewSession
  | TraceRestoreSession
  | TraceCloseSession
    -- Table
  | TraceNewTable
  | TraceOpenSnapshot SnapshotName TableConfigOverride
  | TraceTable TableId TableTrace
  | TraceDeleteSnapshot SnapshotName
  | TraceListSnapshots
    -- Cursor
  | TraceCursor CursorId CursorTrace
    -- Unions
  | TraceUnions (NonEmpty TableId)
  deriving stock Int -> LSMTreeTrace -> ShowS
[LSMTreeTrace] -> ShowS
LSMTreeTrace -> String
(Int -> LSMTreeTrace -> ShowS)
-> (LSMTreeTrace -> String)
-> ([LSMTreeTrace] -> ShowS)
-> Show LSMTreeTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LSMTreeTrace -> ShowS
showsPrec :: Int -> LSMTreeTrace -> ShowS
$cshow :: LSMTreeTrace -> String
show :: LSMTreeTrace -> String
$cshowList :: [LSMTreeTrace] -> ShowS
showList :: [LSMTreeTrace] -> ShowS
Show

data TableTrace =
    -- | A table is created with the specified config.
    --
    -- This message is traced in addition to messages like 'TraceNewTable' and
    -- 'TraceDuplicate'.
    TraceCreateTable TableConfig
  | TraceCloseTable
    -- Lookups
  | TraceLookups Int
  | TraceRangeLookup (Range SerialisedKey)
    -- Updates
  | TraceUpdates Int
  | TraceMerge (AtLevel MergeTrace)
    -- Snapshot
  | TraceSnapshot SnapshotName
    -- Duplicate
  | TraceDuplicate
    -- Unions
  | TraceRemainingUnionDebt
  | TraceSupplyUnionCredits UnionCredits
  deriving stock Int -> TableTrace -> ShowS
[TableTrace] -> ShowS
TableTrace -> String
(Int -> TableTrace -> ShowS)
-> (TableTrace -> String)
-> ([TableTrace] -> ShowS)
-> Show TableTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TableTrace -> ShowS
showsPrec :: Int -> TableTrace -> ShowS
$cshow :: TableTrace -> String
show :: TableTrace -> String
$cshowList :: [TableTrace] -> ShowS
showList :: [TableTrace] -> ShowS
Show

data CursorTrace =
    TraceCreateCursor TableId
  | TraceCloseCursor
  | TraceReadCursor Int
  deriving stock Int -> CursorTrace -> ShowS
[CursorTrace] -> ShowS
CursorTrace -> String
(Int -> CursorTrace -> ShowS)
-> (CursorTrace -> String)
-> ([CursorTrace] -> ShowS)
-> Show CursorTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CursorTrace -> ShowS
showsPrec :: Int -> CursorTrace -> ShowS
$cshow :: CursorTrace -> String
show :: CursorTrace -> String
$cshowList :: [CursorTrace] -> ShowS
showList :: [CursorTrace] -> ShowS
Show

{-------------------------------------------------------------------------------
  Session
-------------------------------------------------------------------------------}

-- | A session provides context that is shared across multiple tables.
--
-- For more information, see 'Database.LSMTree.Common.Session'.
data Session m h = Session {
      -- | The primary purpose of this 'RWVar' is to ensure consistent views of
      -- the open-/closedness of a session when multiple threads require access
      -- to the session's fields (see 'withOpenSession'). We use more
      -- fine-grained synchronisation for various mutable parts of an open
      -- session.
      --
      -- INVARIANT: once the session state is changed from 'SessionOpen' to
      -- 'SessionClosed', it is never changed back to 'SessionOpen' again.
      forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState  :: !(RWVar m (SessionState m h))
    , forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer :: !(Tracer m LSMTreeTrace)
    }

instance NFData (Session m h) where
  rnf :: Session m h -> ()
rnf (Session RWVar m (SessionState m h)
a Tracer m LSMTreeTrace
b) = RWVar m (SessionState m h) -> ()
forall a. NFData a => a -> ()
rnf RWVar m (SessionState m h)
a () -> () -> ()
forall a b. a -> b -> b
`seq` Tracer m LSMTreeTrace -> ()
forall a. a -> ()
rwhnf Tracer m LSMTreeTrace
b

data SessionState m h =
    SessionOpen !(SessionEnv m h)
  | SessionClosed

data SessionEnv m h = SessionEnv {
    -- | The path to the directory in which this session is live. This is a path
    -- relative to root of the 'HasFS' instance.
    --
    -- INVARIANT: the session root is never changed during the lifetime of a
    -- session.
    forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot        :: !SessionRoot
  , forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS       :: !(HasFS m h)
  , forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO  :: !(HasBlockIO m h)
  , forall (m :: * -> *) h. SessionEnv m h -> LockFileHandle m
sessionLockFile    :: !(FS.LockFileHandle m)
    -- | A session-wide shared, atomic counter that is used to produce unique
    -- names, for example: run names, table IDs.
  , forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter :: !(UniqCounter m)
    -- | Open tables are tracked here so they can be closed once the session is
    -- closed. Tables also become untracked when they are closed manually.
    --
    -- Tables are assigned unique identifiers using 'sessionUniqCounter' to
    -- ensure that modifications to the set of known tables are independent.
    -- Each identifier is added only once in 'new', 'openSnapshot', 'duplicate',
    -- 'union', or 'unions', and is deleted only once in 'close' or
    -- 'closeSession'.
    --
    -- * A new table may only insert its own identifier when it has acquired the
    --   'sessionState' read-lock. This is to prevent races with 'closeSession'.
    --
    -- * A table 'close' may delete its own identifier from the set of open
    --   tables without restrictions, even concurrently with 'closeSession'.
    --   This is safe because 'close' is idempotent'.
  , forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables  :: !(StrictMVar m (Map TableId (Table m h)))
    -- | Similarly to tables, open cursors are tracked so they can be closed
    -- once the session is closed. See 'sessionOpenTables'.
  , forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors :: !(StrictMVar m (Map CursorId (Cursor m h)))
  }

-- | The session is closed.
data SessionClosedError
    = ErrSessionClosed
    deriving stock (Int -> SessionClosedError -> ShowS
[SessionClosedError] -> ShowS
SessionClosedError -> String
(Int -> SessionClosedError -> ShowS)
-> (SessionClosedError -> String)
-> ([SessionClosedError] -> ShowS)
-> Show SessionClosedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionClosedError -> ShowS
showsPrec :: Int -> SessionClosedError -> ShowS
$cshow :: SessionClosedError -> String
show :: SessionClosedError -> String
$cshowList :: [SessionClosedError] -> ShowS
showList :: [SessionClosedError] -> ShowS
Show, SessionClosedError -> SessionClosedError -> Bool
(SessionClosedError -> SessionClosedError -> Bool)
-> (SessionClosedError -> SessionClosedError -> Bool)
-> Eq SessionClosedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionClosedError -> SessionClosedError -> Bool
== :: SessionClosedError -> SessionClosedError -> Bool
$c/= :: SessionClosedError -> SessionClosedError -> Bool
/= :: SessionClosedError -> SessionClosedError -> Bool
Eq)
    deriving anyclass (Show SessionClosedError
Typeable SessionClosedError
(Typeable SessionClosedError, Show SessionClosedError) =>
(SessionClosedError -> SomeException)
-> (SomeException -> Maybe SessionClosedError)
-> (SessionClosedError -> String)
-> Exception SessionClosedError
SomeException -> Maybe SessionClosedError
SessionClosedError -> String
SessionClosedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionClosedError -> SomeException
toException :: SessionClosedError -> SomeException
$cfromException :: SomeException -> Maybe SessionClosedError
fromException :: SomeException -> Maybe SessionClosedError
$cdisplayException :: SessionClosedError -> String
displayException :: SessionClosedError -> String
Exception)

{-# INLINE withOpenSession #-}
{-# SPECIALISE withOpenSession ::
     Session IO h
  -> (SessionEnv IO h -> IO a)
  -> IO a #-}
-- | 'withOpenSession' ensures that the session stays open for the duration of the
-- provided continuation.
--
-- NOTE: any operation except 'sessionClose' can use this function.
withOpenSession ::
     (MonadSTM m, MonadThrow m)
  => Session m h
  -> (SessionEnv m h -> m a)
  -> m a
withOpenSession :: forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh SessionEnv m h -> m a
action = RWVar m (SessionState m h) -> (SessionState m h -> m a) -> m a
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (Session m h -> RWVar m (SessionState m h)
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState Session m h
sesh) ((SessionState m h -> m a) -> m a)
-> (SessionState m h -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \case
    SessionState m h
SessionClosed -> SessionClosedError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SessionClosedError
ErrSessionClosed
    SessionOpen SessionEnv m h
seshEnv -> SessionEnv m h -> m a
action SessionEnv m h
seshEnv

--
-- Implementation of public API
--

{-# SPECIALISE withSession ::
     Tracer IO LSMTreeTrace
  -> HasFS IO h
  -> HasBlockIO IO h
  -> FsPath
  -> (Session IO h -> IO a)
  -> IO a #-}
-- | See 'Database.LSMTree.Common.withSession'.
withSession ::
     (MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
  => Tracer m LSMTreeTrace
  -> HasFS m h
  -> HasBlockIO m h
  -> FsPath
  -> (Session m h -> m a)
  -> m a
withSession :: forall (m :: * -> *) h a.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Tracer m LSMTreeTrace
-> HasFS m h
-> HasBlockIO m h
-> FsPath
-> (Session m h -> m a)
-> m a
withSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio FsPath
dir = m (Session m h)
-> (Session m h -> m ()) -> (Session m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
openSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio FsPath
dir) Session m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> m ()
closeSession

-- | The session directory does not exist.
data SessionDirDoesNotExistError
    = ErrSessionDirDoesNotExist !FsErrorPath
    deriving stock (Int -> SessionDirDoesNotExistError -> ShowS
[SessionDirDoesNotExistError] -> ShowS
SessionDirDoesNotExistError -> String
(Int -> SessionDirDoesNotExistError -> ShowS)
-> (SessionDirDoesNotExistError -> String)
-> ([SessionDirDoesNotExistError] -> ShowS)
-> Show SessionDirDoesNotExistError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionDirDoesNotExistError -> ShowS
showsPrec :: Int -> SessionDirDoesNotExistError -> ShowS
$cshow :: SessionDirDoesNotExistError -> String
show :: SessionDirDoesNotExistError -> String
$cshowList :: [SessionDirDoesNotExistError] -> ShowS
showList :: [SessionDirDoesNotExistError] -> ShowS
Show, SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
(SessionDirDoesNotExistError
 -> SessionDirDoesNotExistError -> Bool)
-> (SessionDirDoesNotExistError
    -> SessionDirDoesNotExistError -> Bool)
-> Eq SessionDirDoesNotExistError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
== :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
$c/= :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
/= :: SessionDirDoesNotExistError -> SessionDirDoesNotExistError -> Bool
Eq)
    deriving anyclass (Show SessionDirDoesNotExistError
Typeable SessionDirDoesNotExistError
(Typeable SessionDirDoesNotExistError,
 Show SessionDirDoesNotExistError) =>
(SessionDirDoesNotExistError -> SomeException)
-> (SomeException -> Maybe SessionDirDoesNotExistError)
-> (SessionDirDoesNotExistError -> String)
-> Exception SessionDirDoesNotExistError
SomeException -> Maybe SessionDirDoesNotExistError
SessionDirDoesNotExistError -> String
SessionDirDoesNotExistError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionDirDoesNotExistError -> SomeException
toException :: SessionDirDoesNotExistError -> SomeException
$cfromException :: SomeException -> Maybe SessionDirDoesNotExistError
fromException :: SomeException -> Maybe SessionDirDoesNotExistError
$cdisplayException :: SessionDirDoesNotExistError -> String
displayException :: SessionDirDoesNotExistError -> String
Exception)

-- | The session directory is locked by another active session.
data SessionDirLockedError
    = ErrSessionDirLocked !FsErrorPath
    deriving stock (Int -> SessionDirLockedError -> ShowS
[SessionDirLockedError] -> ShowS
SessionDirLockedError -> String
(Int -> SessionDirLockedError -> ShowS)
-> (SessionDirLockedError -> String)
-> ([SessionDirLockedError] -> ShowS)
-> Show SessionDirLockedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionDirLockedError -> ShowS
showsPrec :: Int -> SessionDirLockedError -> ShowS
$cshow :: SessionDirLockedError -> String
show :: SessionDirLockedError -> String
$cshowList :: [SessionDirLockedError] -> ShowS
showList :: [SessionDirLockedError] -> ShowS
Show, SessionDirLockedError -> SessionDirLockedError -> Bool
(SessionDirLockedError -> SessionDirLockedError -> Bool)
-> (SessionDirLockedError -> SessionDirLockedError -> Bool)
-> Eq SessionDirLockedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionDirLockedError -> SessionDirLockedError -> Bool
== :: SessionDirLockedError -> SessionDirLockedError -> Bool
$c/= :: SessionDirLockedError -> SessionDirLockedError -> Bool
/= :: SessionDirLockedError -> SessionDirLockedError -> Bool
Eq)
    deriving anyclass (Show SessionDirLockedError
Typeable SessionDirLockedError
(Typeable SessionDirLockedError, Show SessionDirLockedError) =>
(SessionDirLockedError -> SomeException)
-> (SomeException -> Maybe SessionDirLockedError)
-> (SessionDirLockedError -> String)
-> Exception SessionDirLockedError
SomeException -> Maybe SessionDirLockedError
SessionDirLockedError -> String
SessionDirLockedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionDirLockedError -> SomeException
toException :: SessionDirLockedError -> SomeException
$cfromException :: SomeException -> Maybe SessionDirLockedError
fromException :: SomeException -> Maybe SessionDirLockedError
$cdisplayException :: SessionDirLockedError -> String
displayException :: SessionDirLockedError -> String
Exception)

-- | The session directory is corrupted, e.g., it misses required files or contains unexpected files.
data SessionDirCorruptedError
    = ErrSessionDirCorrupted !FsErrorPath
    deriving stock (Int -> SessionDirCorruptedError -> ShowS
[SessionDirCorruptedError] -> ShowS
SessionDirCorruptedError -> String
(Int -> SessionDirCorruptedError -> ShowS)
-> (SessionDirCorruptedError -> String)
-> ([SessionDirCorruptedError] -> ShowS)
-> Show SessionDirCorruptedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SessionDirCorruptedError -> ShowS
showsPrec :: Int -> SessionDirCorruptedError -> ShowS
$cshow :: SessionDirCorruptedError -> String
show :: SessionDirCorruptedError -> String
$cshowList :: [SessionDirCorruptedError] -> ShowS
showList :: [SessionDirCorruptedError] -> ShowS
Show, SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
(SessionDirCorruptedError -> SessionDirCorruptedError -> Bool)
-> (SessionDirCorruptedError -> SessionDirCorruptedError -> Bool)
-> Eq SessionDirCorruptedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
== :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
$c/= :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
/= :: SessionDirCorruptedError -> SessionDirCorruptedError -> Bool
Eq)
    deriving anyclass (Show SessionDirCorruptedError
Typeable SessionDirCorruptedError
(Typeable SessionDirCorruptedError,
 Show SessionDirCorruptedError) =>
(SessionDirCorruptedError -> SomeException)
-> (SomeException -> Maybe SessionDirCorruptedError)
-> (SessionDirCorruptedError -> String)
-> Exception SessionDirCorruptedError
SomeException -> Maybe SessionDirCorruptedError
SessionDirCorruptedError -> String
SessionDirCorruptedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SessionDirCorruptedError -> SomeException
toException :: SessionDirCorruptedError -> SomeException
$cfromException :: SomeException -> Maybe SessionDirCorruptedError
fromException :: SomeException -> Maybe SessionDirCorruptedError
$cdisplayException :: SessionDirCorruptedError -> String
displayException :: SessionDirCorruptedError -> String
Exception)

{-# SPECIALISE openSession ::
     Tracer IO LSMTreeTrace
  -> HasFS IO h
  -> HasBlockIO IO h
  -> FsPath
  -> IO (Session IO h) #-}
-- | See 'Database.LSMTree.Common.openSession'.
openSession ::
     forall m h.
     (MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m)
  => Tracer m LSMTreeTrace
  -> HasFS m h
  -> HasBlockIO m h -- TODO: could we prevent the user from having to pass this in?
  -> FsPath -- ^ Path to the session directory
  -> m (Session m h)
openSession :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Tracer m LSMTreeTrace
-> HasFS m h -> HasBlockIO m h -> FsPath -> m (Session m h)
openSession Tracer m LSMTreeTrace
tr HasFS m h
hfs HasBlockIO m h
hbio FsPath
dir =
    -- We can not use modifyWithActionRegistry here, since there is no in-memory
    -- state to modify. We use withActionRegistry instead, which may have a tiny
    -- chance of leaking resources if openSession is not called in a masked
    -- state.
    (ActionRegistry m -> m (Session m h)) -> m (Session m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Session m h)) -> m (Session m h))
-> (ActionRegistry m -> m (Session m h)) -> m (Session m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
      Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m LSMTreeTrace
tr (FsPath -> LSMTreeTrace
TraceOpenSession FsPath
dir)
      Bool
dirExists <- HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
dir
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
dirExists (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
        SessionDirDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirDoesNotExistError
ErrSessionDirDoesNotExist (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
dir))
      -- List directory contents /before/ trying to acquire a file lock, so that
      -- that the lock file does not show up in the listed contents.
      Set String
dirContents <- HasFS m h -> HasCallStack => FsPath -> m (Set String)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m (Set String)
FS.listDirectory HasFS m h
hfs FsPath
dir
      -- Try to acquire the session file lock as soon as possible to reduce the
      -- risk of race conditions.
      --
      -- The lock is only released when an exception is raised, otherwise the lock
      -- is included in the returned Session.
      Either FsError (Maybe (LockFileHandle m))
elock <-
        ActionRegistry m
-> (Either FsError (Maybe (LockFileHandle m))
    -> Maybe (LockFileHandle m))
-> m (Either FsError (Maybe (LockFileHandle m)))
-> (LockFileHandle m -> m ())
-> m (Either FsError (Maybe (LockFileHandle m)))
forall (m :: * -> *) a b.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> (a -> Maybe b) -> m a -> (b -> m ()) -> m a
withRollbackFun ActionRegistry m
reg
          (Maybe (LockFileHandle m)
-> Either FsError (Maybe (LockFileHandle m))
-> Maybe (LockFileHandle m)
forall b a. b -> Either a b -> b
fromRight Maybe (LockFileHandle m)
forall a. Maybe a
Nothing)
          m (Either FsError (Maybe (LockFileHandle m)))
acquireLock
          LockFileHandle m -> m ()
forall {m :: * -> *}. LockFileHandle m -> m ()
releaseLock

      case Either FsError (Maybe (LockFileHandle m))
elock of
        Left FsError
e
          | FsErrorType
FS.FsResourceAlreadyInUse <- FsError -> FsErrorType
FS.fsErrorType FsError
e
          , fsep :: FsErrorPath
fsep@(FsErrorPath Maybe MountPoint
_ FsPath
fsp) <- FsError -> FsErrorPath
FS.fsErrorPath FsError
e
          , FsPath
fsp FsPath -> FsPath -> Bool
forall a. Eq a => a -> a -> Bool
== FsPath
lockFilePath
          -> SessionDirLockedError -> m (Session m h)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirLockedError
ErrSessionDirLocked FsErrorPath
fsep)
        Left  FsError
e -> FsError -> m (Session m h)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO FsError
e -- rethrow unexpected errors
        Right Maybe (LockFileHandle m)
Nothing -> SessionDirLockedError -> m (Session m h)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirLockedError
ErrSessionDirLocked (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
lockFilePath))
        Right (Just LockFileHandle m
sessionFileLock) ->
          if Set String -> Bool
forall a. Set a -> Bool
Set.null Set String
dirContents then ActionRegistry m -> LockFileHandle m -> m (Session m h)
newSession ActionRegistry m
reg LockFileHandle m
sessionFileLock
                                  else ActionRegistry m -> LockFileHandle m -> m (Session m h)
restoreSession ActionRegistry m
reg LockFileHandle m
sessionFileLock
  where
    root :: SessionRoot
root             = FsPath -> SessionRoot
Paths.SessionRoot FsPath
dir
    lockFilePath :: FsPath
lockFilePath     = SessionRoot -> FsPath
Paths.lockFile SessionRoot
root
    activeDirPath :: FsPath
activeDirPath    = ActiveDir -> FsPath
Paths.getActiveDir (SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root)
    snapshotsDirPath :: FsPath
snapshotsDirPath = SessionRoot -> FsPath
Paths.snapshotsDir SessionRoot
root

    acquireLock :: m (Either FsError (Maybe (LockFileHandle m)))
acquireLock = forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @m @FsError (m (Maybe (LockFileHandle m))
 -> m (Either FsError (Maybe (LockFileHandle m))))
-> m (Maybe (LockFileHandle m))
-> m (Either FsError (Maybe (LockFileHandle m)))
forall a b. (a -> b) -> a -> b
$ HasBlockIO m h
-> FsPath -> LockMode -> m (Maybe (LockFileHandle m))
forall (m :: * -> *) h.
HasBlockIO m h
-> FsPath -> LockMode -> m (Maybe (LockFileHandle m))
FS.tryLockFile HasBlockIO m h
hbio FsPath
lockFilePath LockMode
FS.ExclusiveLock

    releaseLock :: LockFileHandle m -> m ()
releaseLock = LockFileHandle m -> m ()
forall {m :: * -> *}. LockFileHandle m -> m ()
FS.hUnlock

    mkSession :: LockFileHandle m -> m (Session m h)
mkSession LockFileHandle m
lockFile = do
        UniqCounter m
counterVar <- Int -> m (UniqCounter m)
forall (m :: * -> *). PrimMonad m => Int -> m (UniqCounter m)
newUniqCounter Int
0
        StrictMVar m (Map TableId (Table m h))
openTablesVar <- Map TableId (Table m h)
-> m (StrictMVar m (Map TableId (Table m h)))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar Map TableId (Table m h)
forall k a. Map k a
Map.empty
        StrictMVar m (Map CursorId (Cursor m h))
openCursorsVar <- Map CursorId (Cursor m h)
-> m (StrictMVar m (Map CursorId (Cursor m h)))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar Map CursorId (Cursor m h)
forall k a. Map k a
Map.empty
        RWVar m (SessionState m h)
sessionVar <- SessionState m h -> m (RWVar m (SessionState m h))
forall (m :: * -> *) a. MonadSTM m => a -> m (RWVar m a)
RW.new (SessionState m h -> m (RWVar m (SessionState m h)))
-> SessionState m h -> m (RWVar m (SessionState m h))
forall a b. (a -> b) -> a -> b
$ SessionEnv m h -> SessionState m h
forall (m :: * -> *) h. SessionEnv m h -> SessionState m h
SessionOpen (SessionEnv m h -> SessionState m h)
-> SessionEnv m h -> SessionState m h
forall a b. (a -> b) -> a -> b
$ SessionEnv {
            sessionRoot :: SessionRoot
sessionRoot = SessionRoot
root
          , sessionHasFS :: HasFS m h
sessionHasFS = HasFS m h
hfs
          , sessionHasBlockIO :: HasBlockIO m h
sessionHasBlockIO = HasBlockIO m h
hbio
          , sessionLockFile :: LockFileHandle m
sessionLockFile = LockFileHandle m
lockFile
          , sessionUniqCounter :: UniqCounter m
sessionUniqCounter = UniqCounter m
counterVar
          , sessionOpenTables :: StrictMVar m (Map TableId (Table m h))
sessionOpenTables = StrictMVar m (Map TableId (Table m h))
openTablesVar
          , sessionOpenCursors :: StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors = StrictMVar m (Map CursorId (Cursor m h))
openCursorsVar
          }
        Session m h -> m (Session m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Session m h -> m (Session m h)) -> Session m h -> m (Session m h)
forall a b. (a -> b) -> a -> b
$! RWVar m (SessionState m h) -> Tracer m LSMTreeTrace -> Session m h
forall (m :: * -> *) h.
RWVar m (SessionState m h) -> Tracer m LSMTreeTrace -> Session m h
Session RWVar m (SessionState m h)
sessionVar Tracer m LSMTreeTrace
tr

    newSession :: ActionRegistry m -> LockFileHandle m -> m (Session m h)
newSession ActionRegistry m
reg LockFileHandle m
sessionFileLock = do
        Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m LSMTreeTrace
tr LSMTreeTrace
TraceNewSession
        ActionRegistry m -> m () -> m () -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> m () -> m a
withRollback_ ActionRegistry m
reg
          (HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.createDirectory HasFS m h
hfs FsPath
activeDirPath)
          (HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs FsPath
activeDirPath)
        ActionRegistry m -> m () -> m () -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> m () -> m a
withRollback_ ActionRegistry m
reg
          (HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.createDirectory HasFS m h
hfs FsPath
snapshotsDirPath)
          (HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs FsPath
snapshotsDirPath)
        LockFileHandle m -> m (Session m h)
mkSession LockFileHandle m
sessionFileLock

    restoreSession :: ActionRegistry m -> LockFileHandle m -> m (Session m h)
restoreSession ActionRegistry m
_reg LockFileHandle m
sessionFileLock = do
        Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m LSMTreeTrace
tr LSMTreeTrace
TraceRestoreSession
        -- If the layouts are wrong, we throw an exception
        m ()
checkTopLevelDirLayout

        -- Clear the active directory by removing the directory and recreating
        -- it again.
        HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs FsPath
activeDirPath
          m () -> m () -> m ()
forall a b. m a -> m b -> m a
forall (m :: * -> *) a b. MonadThrow m => m a -> m b -> m a
`finally` HasFS m h -> HasCallStack => Bool -> FsPath -> m ()
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => Bool -> FsPath -> m ()
FS.createDirectoryIfMissing HasFS m h
hfs Bool
False FsPath
activeDirPath

        m ()
checkActiveDirLayout
        m ()
checkSnapshotsDirLayout
        LockFileHandle m -> m (Session m h)
mkSession LockFileHandle m
sessionFileLock

    -- Check that the active directory and snapshots directory exist. We assume
    -- the lock file already exists at this point.
    --
    -- This checks only that the /expected/ files and directories exist.
    -- Unexpected files in the top-level directory are ignored for the layout
    -- check.
    checkTopLevelDirLayout :: m ()
checkTopLevelDirLayout = do
      HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
activeDirPath m Bool -> (Bool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
b ->
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
activeDirPath))
      HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs FsPath
snapshotsDirPath m Bool -> (Bool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
b ->
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
snapshotsDirPath))

    -- The active directory should be empty
    checkActiveDirLayout :: m ()
checkActiveDirLayout = do
        Set String
contents <- HasFS m h -> HasCallStack => FsPath -> m (Set String)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m (Set String)
FS.listDirectory HasFS m h
hfs FsPath
activeDirPath
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Set String -> Bool
forall a. Set a -> Bool
Set.null Set String
contents) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SessionDirCorruptedError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (FsErrorPath -> SessionDirCorruptedError
ErrSessionDirCorrupted (HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath HasFS m h
hfs FsPath
activeDirPath))

    -- Nothing to check: snapshots are verified when they are loaded, not when a
    -- session is restored.
    checkSnapshotsDirLayout :: m ()
checkSnapshotsDirLayout = () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

{-# SPECIALISE closeSession :: Session IO h -> IO () #-}
-- | See 'Database.LSMTree.Common.closeSession'.
--
-- A session's global resources will only be released once it is sure that no
-- tables or cursors are open anymore.
closeSession ::
     (MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
  => Session m h
  -> m ()
closeSession :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> m ()
closeSession Session{RWVar m (SessionState m h)
sessionState :: forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState :: RWVar m (SessionState m h)
sessionState, Tracer m LSMTreeTrace
sessionTracer :: forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer :: Tracer m LSMTreeTrace
sessionTracer} = do
    Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m LSMTreeTrace
sessionTracer LSMTreeTrace
TraceCloseSession
    m (SessionState m h)
-> (SessionState m h -> m ())
-> (ActionRegistry m -> SessionState m h -> m (SessionState m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_
      (RWVar m (SessionState m h) -> m (SessionState m h)
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> m a
RW.unsafeAcquireWriteAccess RWVar m (SessionState m h)
sessionState)
      (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (SessionState m h -> STM m ()) -> SessionState m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RWVar m (SessionState m h) -> SessionState m h -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> a -> STM m ()
RW.unsafeReleaseWriteAccess RWVar m (SessionState m h)
sessionState)
      ((ActionRegistry m -> SessionState m h -> m (SessionState m h))
 -> m ())
-> (ActionRegistry m -> SessionState m h -> m (SessionState m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> \case
        SessionState m h
SessionClosed -> SessionState m h -> m (SessionState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SessionState m h
forall (m :: * -> *) h. SessionState m h
SessionClosed
        SessionOpen SessionEnv m h
seshEnv -> do
          -- Close tables and cursors first, so that we know none are open when we
          -- release session-wide resources.
          --
          -- If any tables or cursors have been closed already by a different
          -- thread, then the idempotent close functions will act like a no-op,
          -- and so we are not in trouble.
          --
          -- Since we have a write lock on the session state, we know that no
          -- tables or cursors will be added while we are closing the session
          -- (see sessionOpenTables), and that we are the only thread currently
          -- closing the session. .
          --
          -- We technically don't have to overwrite this with an empty Map, but
          -- why not.

          -- close cursors
          Map CursorId (Cursor m h)
cursors <-
            ActionRegistry m
-> m (Map CursorId (Cursor m h))
-> (Map CursorId (Cursor m h) -> m ())
-> m (Map CursorId (Cursor m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
              (StrictMVar m (Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
seshEnv) Map CursorId (Cursor m h)
forall k a. Map k a
Map.empty)
              (m (Map CursorId (Cursor m h)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Map CursorId (Cursor m h)) -> m ())
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> Map CursorId (Cursor m h)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar m (Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
seshEnv))
          (Cursor m h -> m ()) -> Map CursorId (Cursor m h) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Cursor m h -> m ()) -> Cursor m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Cursor m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) =>
Cursor m h -> m ()
closeCursor) Map CursorId (Cursor m h)
cursors

          -- close tables
          Map TableId (Table m h)
tables <-
            ActionRegistry m
-> m (Map TableId (Table m h))
-> (Map TableId (Table m h) -> m ())
-> m (Map TableId (Table m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
              (StrictMVar m (Map TableId (Table m h))
-> Map TableId (Table m h) -> m (Map TableId (Table m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables SessionEnv m h
seshEnv) Map TableId (Table m h)
forall k a. Map k a
Map.empty)
              (m (Map TableId (Table m h)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Map TableId (Table m h)) -> m ())
-> (Map TableId (Table m h) -> m (Map TableId (Table m h)))
-> Map TableId (Table m h)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar m (Map TableId (Table m h))
-> Map TableId (Table m h) -> m (Map TableId (Table m h))
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m a
swapMVar (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables SessionEnv m h
seshEnv))
          (Table m h -> m ()) -> Map TableId (Table m h) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Table m h -> m ()) -> Table m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Table m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Table m h -> m ()
close) Map TableId (Table m h)
tables

          ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ HasBlockIO m h -> HasCallStack => m ()
forall (m :: * -> *) h. HasBlockIO m h -> HasCallStack => m ()
FS.close (SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO SessionEnv m h
seshEnv)
          ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ LockFileHandle m -> m ()
forall {m :: * -> *}. LockFileHandle m -> m ()
FS.hUnlock (SessionEnv m h -> LockFileHandle m
forall (m :: * -> *) h. SessionEnv m h -> LockFileHandle m
sessionLockFile SessionEnv m h
seshEnv)

          SessionState m h -> m (SessionState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SessionState m h
forall (m :: * -> *) h. SessionState m h
SessionClosed

{-------------------------------------------------------------------------------
  Table
-------------------------------------------------------------------------------}

-- | A handle to an on-disk key\/value table.
--
-- For more information, see 'Database.LSMTree.Normal.Table'.
data Table m h = Table {
      forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig       :: !TableConfig
      -- | The primary purpose of this 'RWVar' is to ensure consistent views of
      -- the open-/closedness of a table when multiple threads require access to
      -- the table's fields (see 'withOpenTable'). We use more fine-grained
      -- synchronisation for various mutable parts of an open table.
    , forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState        :: !(RWVar m (TableState m h))
    , forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager :: !(ArenaManager (PrimState m))
    , forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer       :: !(Tracer m TableTrace)
      -- | Session-unique identifier for this table.
      --
      -- INVARIANT: a table's identifier is never changed during the lifetime of
      -- the table.
    , forall (m :: * -> *) h. Table m h -> TableId
tableId           :: !TableId

      -- === Session-inherited

      -- | The session that this table belongs to.
      --
      -- INVARIANT: a table only ever belongs to one session, and can't be
      -- transferred to a different session.
    , forall (m :: * -> *) h. Table m h -> Session m h
tableSession      :: !(Session m h)
    }

instance NFData (Table m h) where
  rnf :: Table m h -> ()
rnf (Table TableConfig
a RWVar m (TableState m h)
b ArenaManager (PrimState m)
c Tracer m TableTrace
d TableId
e Session m h
f) =
    TableConfig -> ()
forall a. NFData a => a -> ()
rnf TableConfig
a () -> () -> ()
forall a b. a -> b -> b
`seq` RWVar m (TableState m h) -> ()
forall a. NFData a => a -> ()
rnf RWVar m (TableState m h)
b () -> () -> ()
forall a b. a -> b -> b
`seq` ArenaManager (PrimState m) -> ()
forall a. NFData a => a -> ()
rnf ArenaManager (PrimState m)
c () -> () -> ()
forall a b. a -> b -> b
`seq` Tracer m TableTrace -> ()
forall a. a -> ()
rwhnf Tracer m TableTrace
d () -> () -> ()
forall a b. a -> b -> b
`seq` TableId -> ()
forall a. NFData a => a -> ()
rnf TableId
e() -> () -> ()
forall a b. a -> b -> b
`seq` Session m h -> ()
forall a. a -> ()
rwhnf Session m h
f

-- | A table may assume that its corresponding session is still open as
-- long as the table is open. A session's global resources, and therefore
-- resources that are inherited by the table, will only be released once the
-- session is sure that no tables are open anymore.
data TableState m h =
    TableOpen !(TableEnv m h)
  | TableClosed

data TableEnv m h = TableEnv {
    -- === Session-inherited

    -- | Use this instead of 'tableSession' for easy access. An open table may
    -- assume that its session is open.
    forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv :: !(SessionEnv m h)

    -- === Table-specific

    -- | All of the state being in a single 'StrictMVar' is a relatively simple
    -- solution, but there could be more concurrency. For example, while inserts
    -- are in progress, lookups could still look at the old state without
    -- waiting for the MVar.
    --
    -- TODO: switch to more fine-grained synchronisation approach
  , forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent    :: !(RWVar m (TableContent m h))
  }

{-# INLINE tableSessionRoot #-}
 -- | Inherited from session for ease of access.
tableSessionRoot :: TableEnv m h -> SessionRoot
tableSessionRoot :: forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot = SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot (SessionEnv m h -> SessionRoot)
-> (TableEnv m h -> SessionEnv m h) -> TableEnv m h -> SessionRoot
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv

{-# INLINE tableHasFS #-}
-- | Inherited from session for ease of access.
tableHasFS :: TableEnv m h -> HasFS m h
tableHasFS :: forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS = SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS (SessionEnv m h -> HasFS m h)
-> (TableEnv m h -> SessionEnv m h) -> TableEnv m h -> HasFS m h
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv

{-# INLINE tableHasBlockIO #-}
-- | Inherited from session for ease of access.
tableHasBlockIO :: TableEnv m h -> HasBlockIO m h
tableHasBlockIO :: forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO = SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO (SessionEnv m h -> HasBlockIO m h)
-> (TableEnv m h -> SessionEnv m h)
-> TableEnv m h
-> HasBlockIO m h
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv

{-# INLINE tableSessionUniqCounter #-}
-- | Inherited from session for ease of access.
tableSessionUniqCounter :: TableEnv m h -> UniqCounter m
tableSessionUniqCounter :: forall (m :: * -> *) h. TableEnv m h -> UniqCounter m
tableSessionUniqCounter = SessionEnv m h -> UniqCounter m
forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter (SessionEnv m h -> UniqCounter m)
-> (TableEnv m h -> SessionEnv m h)
-> TableEnv m h
-> UniqCounter m
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv

{-# INLINE tableSessionUntrackTable #-}
{-# SPECIALISE tableSessionUntrackTable :: TableId -> TableEnv IO h -> IO () #-}
-- | Open tables are tracked in the corresponding session, so when a table is
-- closed it should become untracked (forgotten).
tableSessionUntrackTable :: MonadMVar m => TableId -> TableEnv m h -> m ()
tableSessionUntrackTable :: forall (m :: * -> *) h.
MonadMVar m =>
TableId -> TableEnv m h -> m ()
tableSessionUntrackTable TableId
tableId TableEnv m h
tEnv =
    StrictMVar m (Map TableId (Table m h))
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables (TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv TableEnv m h
tEnv)) ((Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ())
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall a b. (a -> b) -> a -> b
$ Map TableId (Table m h) -> m (Map TableId (Table m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map TableId (Table m h) -> m (Map TableId (Table m h)))
-> (Map TableId (Table m h) -> Map TableId (Table m h))
-> Map TableId (Table m h)
-> m (Map TableId (Table m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableId -> Map TableId (Table m h) -> Map TableId (Table m h)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete TableId
tableId

-- | The table is closed.
data TableClosedError
    = ErrTableClosed
    deriving stock (Int -> TableClosedError -> ShowS
[TableClosedError] -> ShowS
TableClosedError -> String
(Int -> TableClosedError -> ShowS)
-> (TableClosedError -> String)
-> ([TableClosedError] -> ShowS)
-> Show TableClosedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TableClosedError -> ShowS
showsPrec :: Int -> TableClosedError -> ShowS
$cshow :: TableClosedError -> String
show :: TableClosedError -> String
$cshowList :: [TableClosedError] -> ShowS
showList :: [TableClosedError] -> ShowS
Show, TableClosedError -> TableClosedError -> Bool
(TableClosedError -> TableClosedError -> Bool)
-> (TableClosedError -> TableClosedError -> Bool)
-> Eq TableClosedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TableClosedError -> TableClosedError -> Bool
== :: TableClosedError -> TableClosedError -> Bool
$c/= :: TableClosedError -> TableClosedError -> Bool
/= :: TableClosedError -> TableClosedError -> Bool
Eq)
    deriving anyclass (Show TableClosedError
Typeable TableClosedError
(Typeable TableClosedError, Show TableClosedError) =>
(TableClosedError -> SomeException)
-> (SomeException -> Maybe TableClosedError)
-> (TableClosedError -> String)
-> Exception TableClosedError
SomeException -> Maybe TableClosedError
TableClosedError -> String
TableClosedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: TableClosedError -> SomeException
toException :: TableClosedError -> SomeException
$cfromException :: SomeException -> Maybe TableClosedError
fromException :: SomeException -> Maybe TableClosedError
$cdisplayException :: TableClosedError -> String
displayException :: TableClosedError -> String
Exception)

-- | 'withOpenTable' ensures that the table stays open for the duration of the
-- provided continuation.
--
-- NOTE: any operation except 'close' can use this function.
{-# INLINE withOpenTable #-}
{-# SPECIALISE withOpenTable ::
     Table IO h
  -> (TableEnv IO h -> IO a)
  -> IO a #-}
withOpenTable ::
     (MonadSTM m, MonadThrow m)
  => Table m h
  -> (TableEnv m h -> m a)
  -> m a
withOpenTable :: forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t TableEnv m h -> m a
action = RWVar m (TableState m h) -> (TableState m h -> m a) -> m a
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (Table m h -> RWVar m (TableState m h)
forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState Table m h
t) ((TableState m h -> m a) -> m a) -> (TableState m h -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \case
    TableState m h
TableClosed -> TableClosedError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TableClosedError
ErrTableClosed
    TableOpen TableEnv m h
tEnv -> TableEnv m h -> m a
action TableEnv m h
tEnv

--
-- Implementation of public API
--

{-# SPECIALISE withTable ::
     Session IO h
  -> TableConfig
  -> (Table IO h -> IO a)
  -> IO a #-}
-- | See 'Database.LSMTree.Normal.withTable'.
withTable ::
     (MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
  => Session m h
  -> TableConfig
  -> (Table m h -> m a)
  -> m a
withTable :: forall (m :: * -> *) h a.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Session m h -> TableConfig -> (Table m h -> m a) -> m a
withTable Session m h
sesh TableConfig
conf = m (Table m h) -> (Table m h -> m ()) -> (Table m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (Session m h -> TableConfig -> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Session m h -> TableConfig -> m (Table m h)
new Session m h
sesh TableConfig
conf) Table m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Table m h -> m ()
close

{-# SPECIALISE new ::
     Session IO h
  -> TableConfig
  -> IO (Table IO h) #-}
-- | See 'Database.LSMTree.Normal.new'.
new ::
     (MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m)
  => Session m h
  -> TableConfig
  -> m (Table m h)
new :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m, MonadMask m) =>
Session m h -> TableConfig -> m (Table m h)
new Session m h
sesh TableConfig
conf = do
    Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh) LSMTreeTrace
TraceNewTable
    Session m h -> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m (Table m h)) -> m (Table m h))
-> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv ->
      (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Table m h)) -> m (Table m h))
-> (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
        ArenaManager (PrimState m)
am <- m (ArenaManager (PrimState m))
forall (m :: * -> *). PrimMonad m => m (ArenaManager (PrimState m))
newArenaManager
        TableContent m h
tc <- SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m, MonadMVar m) =>
SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
newEmptyTableContent SessionEnv m h
seshEnv ActionRegistry m
reg
        ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf ArenaManager (PrimState m)
am TableContent m h
tc

{-# SPECIALISE newEmptyTableContent ::
     SessionEnv IO h
  -> ActionRegistry IO
  -> IO (TableContent IO h) #-}
newEmptyTableContent ::
     (PrimMonad m, MonadMask m, MonadMVar m)
  => SessionEnv m h
  -> ActionRegistry m
  -> m (TableContent m h)
newEmptyTableContent :: forall (m :: * -> *) h.
(PrimMonad m, MonadMask m, MonadMVar m) =>
SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
newEmptyTableContent SessionEnv m h
seshEnv ActionRegistry m
reg = do
    FsPath
blobpath <- SessionRoot -> Unique -> FsPath
Paths.tableBlobPath (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) (Unique -> FsPath) -> m Unique -> m FsPath
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                  UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (SessionEnv m h -> UniqCounter m
forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter SessionEnv m h
seshEnv)
    let tableWriteBuffer :: WriteBuffer
tableWriteBuffer = WriteBuffer
WB.empty
    Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
      <- ActionRegistry m
-> m (Ref (WriteBufferBlobs m h))
-> (Ref (WriteBufferBlobs m h) -> m ())
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
           (HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
HasFS m h -> FsPath -> m (Ref (WriteBufferBlobs m h))
WBB.new (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) FsPath
blobpath)
           Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
    let tableLevels :: Vector a
tableLevels = Vector a
forall a. Vector a
V.empty
    LevelsCache m h
tableCache <- ActionRegistry m -> Levels m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
mkLevelsCache ActionRegistry m
reg Levels m h
forall a. Vector a
tableLevels
    TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent {
      WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer
    , Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
    , Levels m h
forall a. Vector a
tableLevels :: forall a. Vector a
tableLevels :: Levels m h
tableLevels
    , LevelsCache m h
tableCache :: LevelsCache m h
tableCache :: LevelsCache m h
tableCache
    , tableUnionLevel :: UnionLevel m h
tableUnionLevel = UnionLevel m h
forall (m :: * -> *) h. UnionLevel m h
NoUnion
    }


{-# SPECIALISE newWith ::
     ActionRegistry IO
  -> Session IO h
  -> SessionEnv IO h
  -> TableConfig
  -> ArenaManager RealWorld
  -> TableContent IO h
  -> IO (Table IO h) #-}
newWith ::
     (MonadSTM m, MonadMVar m, PrimMonad m)
  => ActionRegistry m
  -> Session m h
  -> SessionEnv m h
  -> TableConfig
  -> ArenaManager (PrimState m)
  -> TableContent m h
  -> m (Table m h)
newWith :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf !ArenaManager (PrimState m)
am !TableContent m h
tc = do
    TableId
tableId <- Unique -> TableId
uniqueToTableId (Unique -> TableId) -> m Unique -> m TableId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (SessionEnv m h -> UniqCounter m
forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter SessionEnv m h
seshEnv)
    let tr :: Tracer m TableTrace
tr = TableId -> TableTrace -> LSMTreeTrace
TraceTable TableId
tableId (TableTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m TableTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh
    Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m TableTrace
tr (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ TableConfig -> TableTrace
TraceCreateTable TableConfig
conf
    -- The session is kept open until we've updated the session's set of tracked
    -- tables. If 'closeSession' is called by another thread while this code
    -- block is being executed, that thread will block until it reads the
    -- /updated/ set of tracked tables.
    RWVar m (TableContent m h)
contentVar <- TableContent m h -> m (RWVar m (TableContent m h))
forall (m :: * -> *) a. MonadSTM m => a -> m (RWVar m a)
RW.new (TableContent m h -> m (RWVar m (TableContent m h)))
-> TableContent m h -> m (RWVar m (TableContent m h))
forall a b. (a -> b) -> a -> b
$ TableContent m h
tc
    RWVar m (TableState m h)
tableVar <- TableState m h -> m (RWVar m (TableState m h))
forall (m :: * -> *) a. MonadSTM m => a -> m (RWVar m a)
RW.new (TableState m h -> m (RWVar m (TableState m h)))
-> TableState m h -> m (RWVar m (TableState m h))
forall a b. (a -> b) -> a -> b
$ TableEnv m h -> TableState m h
forall (m :: * -> *) h. TableEnv m h -> TableState m h
TableOpen (TableEnv m h -> TableState m h) -> TableEnv m h -> TableState m h
forall a b. (a -> b) -> a -> b
$ TableEnv {
          tableSessionEnv :: SessionEnv m h
tableSessionEnv = SessionEnv m h
seshEnv
        , tableContent :: RWVar m (TableContent m h)
tableContent = RWVar m (TableContent m h)
contentVar
        }
    let !t :: Table m h
t = TableConfig
-> RWVar m (TableState m h)
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> Session m h
-> Table m h
forall (m :: * -> *) h.
TableConfig
-> RWVar m (TableState m h)
-> ArenaManager (PrimState m)
-> Tracer m TableTrace
-> TableId
-> Session m h
-> Table m h
Table TableConfig
conf RWVar m (TableState m h)
tableVar ArenaManager (PrimState m)
am Tracer m TableTrace
tr TableId
tableId Session m h
sesh
    -- Track the current table
    ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
      StrictMVar m (Map TableId (Table m h))
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map TableId (Table m h))
sessionOpenTables SessionEnv m h
seshEnv) ((Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ())
-> (Map TableId (Table m h) -> m (Map TableId (Table m h))) -> m ()
forall a b. (a -> b) -> a -> b
$
        Map TableId (Table m h) -> m (Map TableId (Table m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map TableId (Table m h) -> m (Map TableId (Table m h)))
-> (Map TableId (Table m h) -> Map TableId (Table m h))
-> Map TableId (Table m h)
-> m (Map TableId (Table m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TableId
-> Table m h -> Map TableId (Table m h) -> Map TableId (Table m h)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert TableId
tableId Table m h
t
    Table m h -> m (Table m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Table m h -> m (Table m h)) -> Table m h -> m (Table m h)
forall a b. (a -> b) -> a -> b
$! Table m h
t

{-# SPECIALISE close :: Table IO h -> IO () #-}
-- | See 'Database.LSMTree.Normal.close'.
close ::
     (MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m)
  => Table m h
  -> m ()
close :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadMVar m, PrimMonad m) =>
Table m h -> m ()
close Table m h
t = do
    Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) TableTrace
TraceCloseTable
    m (TableState m h)
-> (TableState m h -> m ())
-> (ActionRegistry m -> TableState m h -> m (TableState m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_
      (RWVar m (TableState m h) -> m (TableState m h)
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> m a
RW.unsafeAcquireWriteAccess (Table m h -> RWVar m (TableState m h)
forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState Table m h
t))
      (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (TableState m h -> STM m ()) -> TableState m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RWVar m (TableState m h) -> TableState m h -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> a -> STM m ()
RW.unsafeReleaseWriteAccess (Table m h -> RWVar m (TableState m h)
forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableState Table m h
t)) ((ActionRegistry m -> TableState m h -> m (TableState m h))
 -> m ())
-> (ActionRegistry m -> TableState m h -> m (TableState m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> \case
      TableState m h
TableClosed -> TableState m h -> m (TableState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableState m h
forall (m :: * -> *) h. TableState m h
TableClosed
      TableOpen TableEnv m h
tEnv -> do
        -- Since we have a write lock on the table state, we know that we are the
        -- only thread currently closing the table. We can safely make the session
        -- forget about this table.
        ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (TableId -> TableEnv m h -> m ()
forall (m :: * -> *) h.
MonadMVar m =>
TableId -> TableEnv m h -> m ()
tableSessionUntrackTable (Table m h -> TableId
forall (m :: * -> *) h. Table m h -> TableId
tableId Table m h
t) TableEnv m h
tEnv)
        RWVar m (TableContent m h)
-> (TableContent m h -> m (TableContent m h)) -> m ()
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> (a -> m a) -> m ()
RW.withWriteAccess_ (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m (TableContent m h)) -> m ())
-> (TableContent m h -> m (TableContent m h)) -> m ()
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tc -> do
          ActionRegistry m -> TableContent m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m ()
releaseTableContent ActionRegistry m
reg TableContent m h
tc
          TableContent m h -> m (TableContent m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableContent m h
tc
        TableState m h -> m (TableState m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TableState m h
forall (m :: * -> *) h. TableState m h
TableClosed

{-# SPECIALISE lookups ::
     ResolveSerialisedValue
  -> V.Vector SerialisedKey
  -> Table IO h
  -> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO h)))) #-}
-- | See 'Database.LSMTree.Normal.lookups'.
lookups ::
     (MonadAsync m, MonadMask m, MonadMVar m, MonadST m)
  => ResolveSerialisedValue
  -> V.Vector SerialisedKey
  -> Table m h
  -> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookups :: forall (m :: * -> *) h.
(MonadAsync m, MonadMask m, MonadMVar m, MonadST m) =>
ResolveSerialisedValue
-> Vector SerialisedKey
-> Table m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
lookups ResolveSerialisedValue
resolve Vector SerialisedKey
ks Table m h
t = do
    Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> TableTrace
TraceLookups (Vector SerialisedKey -> Int
forall a. Vector a -> Int
V.length Vector SerialisedKey
ks)
    Table m h
-> (TableEnv m h
    -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h
  -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
 -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> (TableEnv m h
    -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv ->
      RWVar m (TableContent m h)
-> (TableContent m h
    -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h
  -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
 -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> (TableContent m h
    -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tableContent -> do
        case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tableContent of
          UnionLevel m h
NoUnion -> TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularLevelLookups TableEnv m h
tEnv TableContent m h
tableContent
          Union Ref (MergingTree m h)
tree -> do
            Ref (MergingTree m h) -> m Bool
forall (m :: * -> *) h.
MonadMVar m =>
Ref (MergingTree m h) -> m Bool
isStructurallyEmpty Ref (MergingTree m h)
tree m Bool
-> (Bool
    -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Bool
True  -> TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularLevelLookups TableEnv m h
tEnv TableContent m h
tableContent
              Bool
False ->
                -- TODO: the blob refs returned from the tree can be invalidated
                -- by supplyUnionCredits or other operations on any table that
                -- shares merging runs or trees. We need to keep open the runs!
                -- This could be achieved by storing the LookupTree and only
                -- calling MT.releaseLookupTree later, when we are okay with
                -- invalidating the blob refs (similar to the LevelsCache).
                -- Lookups then use the cached tree, but when should we rebuild
                -- the tree? On each call to supplyUnionCredits?
                (ActionRegistry m
 -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m
  -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
 -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> (ActionRegistry m
    -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
                  Async m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularResult <-
                    -- asynchronously, so tree lookup batches can already be
                    -- submitted without waiting for the result.
                    m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
        m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. m a -> m (Async m a)
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
Async.async (m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
 -> m (Async
         m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
        m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a b. (a -> b) -> a -> b
$ TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularLevelLookups TableEnv m h
tEnv TableContent m h
tableContent
                  LookupTree (Vector (Ref (Run m h)))
treeBatches <- ActionRegistry m
-> Ref (MergingTree m h) -> m (LookupTree (Vector (Ref (Run m h))))
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m
-> Ref (MergingTree m h) -> m (LookupTree (Vector (Ref (Run m h))))
MT.buildLookupTree ActionRegistry m
reg Ref (MergingTree m h)
tree
                  LookupTree
  (Async
     m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
treeResults <- LookupTree (Vector (Ref (Run m h)))
-> (Vector (Ref (Run m h))
    -> m (Async
            m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
        (Async
           m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM LookupTree (Vector (Ref (Run m h)))
treeBatches ((Vector (Ref (Run m h))
  -> m (Async
          m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
 -> m (LookupTree
         (Async
            m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))))
-> (Vector (Ref (Run m h))
    -> m (Async
            m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (LookupTree
        (Async
           m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall a b. (a -> b) -> a -> b
$ \Vector (Ref (Run m h))
runs ->
                    m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
        m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. m a -> m (Async m a)
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
Async.async (m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
 -> m (Async
         m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> m (Async
        m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a b. (a -> b) -> a -> b
$ TableEnv m h
-> Vector (Ref (Run m h))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
treeBatchLookups TableEnv m h
tEnv Vector (Ref (Run m h))
runs
                  -- TODO: if regular levels are empty, don't add them to tree
                  Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res <- ResolveSerialisedValue
-> LookupTree
     (Async
        m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h.
MonadAsync m =>
ResolveSerialisedValue
-> LookupTree (Async m (LookupAcc m h)) -> m (LookupAcc m h)
MT.foldLookupTree ResolveSerialisedValue
resolve (LookupTree
   (Async
      m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
 -> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> LookupTree
     (Async
        m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a b. (a -> b) -> a -> b
$
                    TreeMergeType
-> Vector
     (LookupTree
        (Async
           m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree
     (Async
        m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. TreeMergeType -> Vector (LookupTree a) -> LookupTree a
MT.mkLookupNode TreeMergeType
MR.MergeLevel (Vector
   (LookupTree
      (Async
         m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
 -> LookupTree
      (Async
         m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> Vector
     (LookupTree
        (Async
           m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
-> LookupTree
     (Async
        m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a b. (a -> b) -> a -> b
$ [LookupTree
   (Async
      m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))]
-> Vector
     (LookupTree
        (Async
           m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))))
forall a. [a] -> Vector a
V.fromList
                      [ Async m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
-> LookupTree
     (Async
        m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
forall a. a -> LookupTree a
MT.LookupBatch Async m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularResult
                      , LookupTree
  (Async
     m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))))
treeResults
                      ]
                  ActionRegistry m -> LookupTree (Vector (Ref (Run m h))) -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> LookupTree (Vector (Ref (Run m h))) -> m ()
MT.releaseLookupTree ActionRegistry m
reg LookupTree (Vector (Ref (Run m h)))
treeBatches
                  Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h)))
res
  where
    regularLevelLookups :: TableEnv m h
-> TableContent m h
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
regularLevelLookups TableEnv m h
tEnv TableContent m h
tableContent = do
        let !cache :: LevelsCache m h
cache = TableContent m h -> LevelsCache m h
forall (m :: * -> *) h. TableContent m h -> LevelsCache m h
tableCache TableContent m h
tableContent
        HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h.
(MonadThrow m, MonadST m) =>
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (LookupAcc m h)
lookupsIOWithWriteBuffer
          (TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
          (Table m h -> ArenaManager (PrimState m)
forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager Table m h
t)
          ResolveSerialisedValue
resolve
          (TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
tableContent)
          (TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
tableContent)
          (LevelsCache m h -> Vector (Ref (Run m h))
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns LevelsCache m h
cache)
          (LevelsCache m h -> Vector (Bloom SerialisedKey)
forall (m :: * -> *) h.
LevelsCache m h -> Vector (Bloom SerialisedKey)
cachedFilters LevelsCache m h
cache)
          (LevelsCache m h -> Vector Index
forall (m :: * -> *) h. LevelsCache m h -> Vector Index
cachedIndexes LevelsCache m h
cache)
          (LevelsCache m h -> Vector (Handle h)
forall (m :: * -> *) h. LevelsCache m h -> Vector (Handle h)
cachedKOpsFiles LevelsCache m h
cache)
          Vector SerialisedKey
ks

    treeBatchLookups :: TableEnv m h
-> Vector (Ref (Run m h))
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
treeBatchLookups TableEnv m h
tEnv Vector (Ref (Run m h))
runs =
        HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (Vector (Maybe (Entry SerialisedValue (WeakBlobRef m h))))
forall (m :: * -> *) h.
(MonadThrow m, MonadST m) =>
HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> Vector (Ref (Run m h))
-> Vector (Bloom SerialisedKey)
-> Vector Index
-> Vector (Handle h)
-> Vector SerialisedKey
-> m (LookupAcc m h)
lookupsIO
          (TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
          (Table m h -> ArenaManager (PrimState m)
forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager Table m h
t)
          ResolveSerialisedValue
resolve
          Vector (Ref (Run m h))
runs
          ((Ref (Run m h) -> Bloom SerialisedKey)
-> Vector (Ref (Run m h)) -> Vector (Bloom SerialisedKey)
forall a b. (a -> b) -> Vector a -> Vector b
V.mapStrict (\(DeRef Run m h
r) -> Run m h -> Bloom SerialisedKey
forall (m :: * -> *) h. Run m h -> Bloom SerialisedKey
Run.runFilter   Run m h
r) Vector (Ref (Run m h))
runs)
          ((Ref (Run m h) -> Index) -> Vector (Ref (Run m h)) -> Vector Index
forall a b. (a -> b) -> Vector a -> Vector b
V.mapStrict (\(DeRef Run m h
r) -> Run m h -> Index
forall (m :: * -> *) h. Run m h -> Index
Run.runIndex    Run m h
r) Vector (Ref (Run m h))
runs)
          ((Ref (Run m h) -> Handle h)
-> Vector (Ref (Run m h)) -> Vector (Handle h)
forall a b. (a -> b) -> Vector a -> Vector b
V.mapStrict (\(DeRef Run m h
r) -> Run m h -> Handle h
forall (m :: * -> *) h. Run m h -> Handle h
Run.runKOpsFile Run m h
r) Vector (Ref (Run m h))
runs)
          Vector SerialisedKey
ks

{-# SPECIALISE rangeLookup ::
     ResolveSerialisedValue
  -> Range SerialisedKey
  -> Table IO h
  -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
  -> IO (V.Vector res) #-}
-- | See 'Database.LSMTree.Normal.rangeLookup'.
rangeLookup ::
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => ResolveSerialisedValue
  -> Range SerialisedKey
  -> Table m h
  -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
     -- ^ How to map to a query result, different for normal/monoidal
  -> m (V.Vector res)
rangeLookup :: forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> Range SerialisedKey
-> Table m h
-> (SerialisedKey
    -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
rangeLookup ResolveSerialisedValue
resolve Range SerialisedKey
range Table m h
t SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry = do
    Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Range SerialisedKey -> TableTrace
TraceRangeLookup Range SerialisedKey
range
    case Range SerialisedKey
range of
      FromToExcluding SerialisedKey
lb SerialisedKey
ub ->
        OffsetKey
-> Table m h -> (Cursor m h -> m (Vector res)) -> m (Vector res)
forall (m :: * -> *) h a.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
OffsetKey -> Table m h -> (Cursor m h -> m a) -> m a
withCursor (SerialisedKey -> OffsetKey
OffsetKey SerialisedKey
lb) Table m h
t ((Cursor m h -> m (Vector res)) -> m (Vector res))
-> (Cursor m h -> m (Vector res)) -> m (Vector res)
forall a b. (a -> b) -> a -> b
$ \Cursor m h
cursor ->
          Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor (SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
< SerialisedKey
ub) []
      FromToIncluding SerialisedKey
lb SerialisedKey
ub ->
        OffsetKey
-> Table m h -> (Cursor m h -> m (Vector res)) -> m (Vector res)
forall (m :: * -> *) h a.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
OffsetKey -> Table m h -> (Cursor m h -> m a) -> m a
withCursor (SerialisedKey -> OffsetKey
OffsetKey SerialisedKey
lb) Table m h
t ((Cursor m h -> m (Vector res)) -> m (Vector res))
-> (Cursor m h -> m (Vector res)) -> m (Vector res)
forall a b. (a -> b) -> a -> b
$ \Cursor m h
cursor ->
          Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor (SerialisedKey -> SerialisedKey -> Bool
forall a. Ord a => a -> a -> Bool
<= SerialisedKey
ub) []
  where
    -- TODO: tune!
    -- Also, such a high number means that many tests never cover the case
    -- of having multiple chunks. Expose through the public API as config?
    chunkSize :: Int
chunkSize = Int
500

    go :: Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor SerialisedKey -> Bool
isInUpperBound ![Vector res]
chunks = do
      Vector res
chunk <- ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
    -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
    -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursorWhile ResolveSerialisedValue
resolve SerialisedKey -> Bool
isInUpperBound Int
chunkSize Cursor m h
cursor SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry
      let !n :: Int
n = Vector res -> Int
forall a. Vector a -> Int
V.length Vector res
chunk
      if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
chunkSize
        then Cursor m h
-> (SerialisedKey -> Bool) -> [Vector res] -> m (Vector res)
go Cursor m h
cursor SerialisedKey -> Bool
isInUpperBound (Vector res
chunk Vector res -> [Vector res] -> [Vector res]
forall a. a -> [a] -> [a]
: [Vector res]
chunks)
             -- This requires an extra copy. If we had a size hint, we could
             -- directly write everything into the result vector.
             -- TODO(optimise): revisit
        else Vector res -> m (Vector res)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Vector res] -> Vector res
forall a. [Vector a] -> Vector a
V.concat ([Vector res] -> [Vector res]
forall a. [a] -> [a]
reverse (Int -> Int -> Vector res -> Vector res
forall a. Int -> Int -> Vector a -> Vector a
V.slice Int
0 Int
n Vector res
chunk Vector res -> [Vector res] -> [Vector res]
forall a. a -> [a] -> [a]
: [Vector res]
chunks)))

{-# SPECIALISE updates ::
     ResolveSerialisedValue
  -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
  -> Table IO h
  -> IO () #-}
-- | See 'Database.LSMTree.Normal.updates'.
--
-- Does not enforce that mupsert and blobs should not occur in the same table.
updates ::
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => ResolveSerialisedValue
  -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
  -> Table m h
  -> m ()
updates :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> Table m h
-> m ()
updates ResolveSerialisedValue
resolve Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es Table m h
t = do
    Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> TableTrace
TraceUpdates (Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> Int
forall a. Vector a -> Int
V.length Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es)
    let conf :: TableConfig
conf = Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig Table m h
t
    Table m h -> (TableEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m ()) -> m ()) -> (TableEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
      let hfs :: HasFS m h
hfs = TableEnv m h -> HasFS m h
forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS TableEnv m h
tEnv
      m (TableContent m h)
-> (TableContent m h -> m ())
-> (ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_
        (RWVar m (TableContent m h) -> m (TableContent m h)
forall (m :: * -> *) a.
(MonadSTM m, MonadCatch m) =>
RWVar m a -> m a
RW.unsafeAcquireWriteAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv))
        (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ())
-> (TableContent m h -> STM m ()) -> TableContent m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RWVar m (TableContent m h) -> TableContent m h -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> a -> STM m ()
RW.unsafeReleaseWriteAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv)) ((ActionRegistry m -> TableContent m h -> m (TableContent m h))
 -> m ())
-> (ActionRegistry m -> TableContent m h -> m (TableContent m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
          Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
Tracer m (AtLevel MergeTrace)
-> TableConfig
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> SessionRoot
-> UniqCounter m
-> Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
-> ActionRegistry m
-> TableContent m h
-> m (TableContent m h)
updatesWithInterleavedFlushes
            (AtLevel MergeTrace -> TableTrace
TraceMerge (AtLevel MergeTrace -> TableTrace)
-> Tracer m TableTrace -> Tracer m (AtLevel MergeTrace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t)
            TableConfig
conf
            ResolveSerialisedValue
resolve
            HasFS m h
hfs
            (TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
            (TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv)
            (TableEnv m h -> UniqCounter m
forall (m :: * -> *) h. TableEnv m h -> UniqCounter m
tableSessionUniqCounter TableEnv m h
tEnv)
            Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob)
es
            ActionRegistry m
reg

{-------------------------------------------------------------------------------
  Blobs
-------------------------------------------------------------------------------}

{- | A 'BlobRef' used with 'retrieveBlobs' was invalid.

'BlobRef's are obtained from lookups in a 'Table', but they may be
invalidated by subsequent changes in that 'Table'. In general the
reliable way to retrieve blobs is not to change the 'Table' before
retrieving the blobs. To allow later retrievals, duplicate the table
before making modifications and keep the table open until all blob
retrievals are complete.
-}
data BlobRefInvalidError
    = -- | The 'Int' index indicates the first invalid 'BlobRef'.
      ErrBlobRefInvalid !Int
    deriving stock (Int -> BlobRefInvalidError -> ShowS
[BlobRefInvalidError] -> ShowS
BlobRefInvalidError -> String
(Int -> BlobRefInvalidError -> ShowS)
-> (BlobRefInvalidError -> String)
-> ([BlobRefInvalidError] -> ShowS)
-> Show BlobRefInvalidError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BlobRefInvalidError -> ShowS
showsPrec :: Int -> BlobRefInvalidError -> ShowS
$cshow :: BlobRefInvalidError -> String
show :: BlobRefInvalidError -> String
$cshowList :: [BlobRefInvalidError] -> ShowS
showList :: [BlobRefInvalidError] -> ShowS
Show, BlobRefInvalidError -> BlobRefInvalidError -> Bool
(BlobRefInvalidError -> BlobRefInvalidError -> Bool)
-> (BlobRefInvalidError -> BlobRefInvalidError -> Bool)
-> Eq BlobRefInvalidError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
== :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
$c/= :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
/= :: BlobRefInvalidError -> BlobRefInvalidError -> Bool
Eq)
    deriving anyclass (Show BlobRefInvalidError
Typeable BlobRefInvalidError
(Typeable BlobRefInvalidError, Show BlobRefInvalidError) =>
(BlobRefInvalidError -> SomeException)
-> (SomeException -> Maybe BlobRefInvalidError)
-> (BlobRefInvalidError -> String)
-> Exception BlobRefInvalidError
SomeException -> Maybe BlobRefInvalidError
BlobRefInvalidError -> String
BlobRefInvalidError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: BlobRefInvalidError -> SomeException
toException :: BlobRefInvalidError -> SomeException
$cfromException :: SomeException -> Maybe BlobRefInvalidError
fromException :: SomeException -> Maybe BlobRefInvalidError
$cdisplayException :: BlobRefInvalidError -> String
displayException :: BlobRefInvalidError -> String
Exception)

{-# SPECIALISE retrieveBlobs ::
     Session IO h
  -> V.Vector (WeakBlobRef IO h)
  -> IO (V.Vector SerialisedBlob) #-}
retrieveBlobs ::
     (MonadMask m, MonadST m, MonadSTM m)
  => Session m h
  -> V.Vector (WeakBlobRef m h)
  -> m (V.Vector SerialisedBlob)
retrieveBlobs :: forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
Session m h
-> Vector (WeakBlobRef m h) -> m (Vector SerialisedBlob)
retrieveBlobs Session m h
sesh Vector (WeakBlobRef m h)
wrefs =
    Session m h
-> (SessionEnv m h -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m (Vector SerialisedBlob))
 -> m (Vector SerialisedBlob))
-> (SessionEnv m h -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv ->
      let hbio :: HasBlockIO m h
hbio = SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO SessionEnv m h
seshEnv in
      (WeakBlobRefInvalid -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob) -> m (Vector SerialisedBlob)
forall e a. Exception e => (e -> m a) -> m a -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (\(BlobRef.WeakBlobRefInvalid Int
i) ->
                BlobRefInvalidError -> m (Vector SerialisedBlob)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (Int -> BlobRefInvalidError
ErrBlobRefInvalid Int
i)) (m (Vector SerialisedBlob) -> m (Vector SerialisedBlob))
-> m (Vector SerialisedBlob) -> m (Vector SerialisedBlob)
forall a b. (a -> b) -> a -> b
$
      HasBlockIO m h
-> Vector (WeakBlobRef m h) -> m (Vector SerialisedBlob)
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
HasBlockIO m h
-> Vector (WeakBlobRef m h) -> m (Vector SerialisedBlob)
BlobRef.readWeakBlobRefs HasBlockIO m h
hbio Vector (WeakBlobRef m h)
wrefs

{-------------------------------------------------------------------------------
  Cursors
-------------------------------------------------------------------------------}

-- | A read-only view into the table state at the time of cursor creation.
--
-- For more information, see 'Database.LSMTree.Normal.Cursor'.
--
-- The representation of a cursor is similar to that of a 'Table', but
-- simpler, as it is read-only.
data Cursor m h = Cursor {
      -- | Mutual exclusion, only a single thread can read from a cursor at a
      -- given time.
      forall (m :: * -> *) h.
Cursor m h -> StrictMVar m (CursorState m h)
cursorState  :: !(StrictMVar m (CursorState m h))
    , forall (m :: * -> *) h. Cursor m h -> Tracer m CursorTrace
cursorTracer :: !(Tracer m CursorTrace)
    }

instance NFData (Cursor m h) where
  rnf :: Cursor m h -> ()
rnf (Cursor StrictMVar m (CursorState m h)
a Tracer m CursorTrace
b) = StrictMVar m (CursorState m h) -> ()
forall a. a -> ()
rwhnf StrictMVar m (CursorState m h)
a () -> () -> ()
forall a b. a -> b -> b
`seq` Tracer m CursorTrace -> ()
forall a. a -> ()
rwhnf Tracer m CursorTrace
b

data CursorState m h =
    CursorOpen !(CursorEnv m h)
  | CursorClosed  -- ^ Calls to a closed cursor raise an exception.

data CursorEnv m h = CursorEnv {
    -- === Session-inherited

    -- | The session that this cursor belongs to.
    --
    -- NOTE: Consider using the 'cursorSessionEnv' field instead of acquiring
    -- the session lock.
    forall (m :: * -> *) h. CursorEnv m h -> Session m h
cursorSession    :: !(Session m h)
    -- | Use this instead of 'cursorSession' for easy access. An open cursor may
    -- assume that its session is open. A session's global resources, and
    -- therefore resources that are inherited by the cursor, will only be
    -- released once the session is sure that no cursors are open anymore.
  , forall (m :: * -> *) h. CursorEnv m h -> SessionEnv m h
cursorSessionEnv :: !(SessionEnv m h)

    -- === Cursor-specific

    -- | Session-unique identifier for this cursor.
  , forall (m :: * -> *) h. CursorEnv m h -> CursorId
cursorId         :: !CursorId
    -- | Readers are immediately discarded once they are 'Readers.Drained',
    -- so if there is a 'Just', we can assume that it has further entries.
    -- Note that, while the readers do retain references on the blob files
    -- while they are active, once they are drained they do not. This could
    -- invalidate any 'BlobRef's previously handed out. To avoid this, we
    -- explicitly retain references on the runs and write buffer blofs and
    -- only release them when the cursor is closed (see cursorRuns and
    -- cursorWBB below).
  , forall (m :: * -> *) h. CursorEnv m h -> Maybe (Readers m h)
cursorReaders    :: !(Maybe (Readers.Readers m h))

    --TODO: the cursorRuns and cursorWBB could be replaced by just retaining
    -- the BlobFile from the runs and WBB, so that we retain less. Since we
    -- only retain these to keep BlobRefs valid until the cursor is closed.
    -- Alternatively: the Readers could be modified to keep the BlobFiles even
    -- once the readers are drained, and only release them when the Readers is
    -- itself closed.

    -- | The runs held open by the cursor. We must release these references
    -- when the cursor gets closed.
  , forall (m :: * -> *) h. CursorEnv m h -> Vector (Ref (Run m h))
cursorRuns       :: !(V.Vector (Ref (Run m h)))

    -- | The write buffer blobs, which like the runs, we have to keep open
    -- untile the cursor is closed.
  , forall (m :: * -> *) h. CursorEnv m h -> Ref (WriteBufferBlobs m h)
cursorWBB        :: !(Ref (WBB.WriteBufferBlobs m h))
  }

{-# SPECIALISE withCursor ::
     OffsetKey
  -> Table IO h
  -> (Cursor IO h -> IO a)
  -> IO a #-}
-- | See 'Database.LSMTree.Normal.withCursor'.
withCursor ::
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => OffsetKey
  -> Table m h
  -> (Cursor m h -> m a)
  -> m a
withCursor :: forall (m :: * -> *) h a.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
OffsetKey -> Table m h -> (Cursor m h -> m a) -> m a
withCursor OffsetKey
offsetKey Table m h
t = m (Cursor m h)
-> (Cursor m h -> m ()) -> (Cursor m h -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (OffsetKey -> Table m h -> m (Cursor m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
OffsetKey -> Table m h -> m (Cursor m h)
newCursor OffsetKey
offsetKey Table m h
t) Cursor m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) =>
Cursor m h -> m ()
closeCursor

{-# SPECIALISE newCursor ::
     OffsetKey
  -> Table IO h
  -> IO (Cursor IO h) #-}
-- | See 'Database.LSMTree.Normal.newCursor'.
newCursor ::
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => OffsetKey
  -> Table m h
  -> m (Cursor m h)
newCursor :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
OffsetKey -> Table m h -> m (Cursor m h)
newCursor !OffsetKey
offsetKey Table m h
t = Table m h -> (TableEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m (Cursor m h)) -> m (Cursor m h))
-> (TableEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
    let cursorSession :: Session m h
cursorSession = Table m h -> Session m h
forall (m :: * -> *) h. Table m h -> Session m h
tableSession Table m h
t
    let cursorSessionEnv :: SessionEnv m h
cursorSessionEnv = TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv TableEnv m h
tEnv
    CursorId
cursorId <- Unique -> CursorId
uniqueToCursorId (Unique -> CursorId) -> m Unique -> m CursorId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (SessionEnv m h -> UniqCounter m
forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter SessionEnv m h
cursorSessionEnv)
    let cursorTracer :: Tracer m CursorTrace
cursorTracer = CursorId -> CursorTrace -> LSMTreeTrace
TraceCursor CursorId
cursorId (CursorTrace -> LSMTreeTrace)
-> Tracer m LSMTreeTrace -> Tracer m CursorTrace
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
cursorSession
    Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ TableId -> CursorTrace
TraceCreateCursor (Table m h -> TableId
forall (m :: * -> *) h. Table m h -> TableId
tableId Table m h
t)

    -- We acquire a read-lock on the session open-state to prevent races, see
    -- 'sessionOpenTables'.
    Session m h -> (SessionEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
cursorSession ((SessionEnv m h -> m (Cursor m h)) -> m (Cursor m h))
-> (SessionEnv m h -> m (Cursor m h)) -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
_ -> do
      (ActionRegistry m -> m (Cursor m h)) -> m (Cursor m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Cursor m h)) -> m (Cursor m h))
-> (ActionRegistry m -> m (Cursor m h)) -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
        (WriteBuffer
wb, Ref (WriteBufferBlobs m h)
wbblobs, Vector (Ref (Run m h))
cursorRuns) <- ActionRegistry m
-> RWVar m (TableContent m h)
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
      Vector (Ref (Run m h)))
forall {m :: * -> *} {h}.
(MonadMask m, PrimMonad m, MonadSTM m) =>
ActionRegistry m
-> RWVar m (TableContent m h)
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
      Vector (Ref (Run m h)))
dupTableContent ActionRegistry m
reg (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv)
        Maybe (Readers m h)
cursorReaders <-
          ActionRegistry m
-> m (Maybe (Readers m h))
-> (Readers m h -> m ())
-> m (Maybe (Readers m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m (Maybe a) -> (a -> m ()) -> m (Maybe a)
withRollbackMaybe ActionRegistry m
reg
            (OffsetKey
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
-> Vector (Ref (Run m h))
-> m (Maybe (Readers m h))
forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
OffsetKey
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
-> Vector (Ref (Run m h))
-> m (Maybe (Readers m h))
Readers.new OffsetKey
offsetKey ((WriteBuffer, Ref (WriteBufferBlobs m h))
-> Maybe (WriteBuffer, Ref (WriteBufferBlobs m h))
forall a. a -> Maybe a
Just (WriteBuffer
wb, Ref (WriteBufferBlobs m h)
wbblobs)) Vector (Ref (Run m h))
cursorRuns)
            Readers m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
Readers.close
        let cursorWBB :: Ref (WriteBufferBlobs m h)
cursorWBB = Ref (WriteBufferBlobs m h)
wbblobs
        StrictMVar m (CursorState m h)
cursorState <- CursorState m h -> m (StrictMVar m (CursorState m h))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar (CursorEnv m h -> CursorState m h
forall (m :: * -> *) h. CursorEnv m h -> CursorState m h
CursorOpen CursorEnv {Maybe (Readers m h)
Ref (WriteBufferBlobs m h)
Vector (Ref (Run m h))
CursorId
SessionEnv m h
Session m h
cursorSessionEnv :: SessionEnv m h
cursorSession :: Session m h
cursorId :: CursorId
cursorReaders :: Maybe (Readers m h)
cursorRuns :: Vector (Ref (Run m h))
cursorWBB :: Ref (WriteBufferBlobs m h)
cursorSession :: Session m h
cursorSessionEnv :: SessionEnv m h
cursorId :: CursorId
cursorRuns :: Vector (Ref (Run m h))
cursorReaders :: Maybe (Readers m h)
cursorWBB :: Ref (WriteBufferBlobs m h)
..})
        let !cursor :: Cursor m h
cursor = Cursor {StrictMVar m (CursorState m h)
cursorState :: StrictMVar m (CursorState m h)
cursorState :: StrictMVar m (CursorState m h)
cursorState, Tracer m CursorTrace
cursorTracer :: Tracer m CursorTrace
cursorTracer :: Tracer m CursorTrace
cursorTracer}
        -- Track cursor, but careful: If now an exception is raised, all
        -- resources get freed by the registry, so if the session still
        -- tracks 'cursor' (which is 'CursorOpen'), it later double frees.
        -- Therefore, we only track the cursor if 'withActionRegistry' exits
        -- successfully, i.e. using 'delayedCommit'.
        ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          StrictMVar m (Map CursorId (Cursor m h))
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
cursorSessionEnv) ((Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
 -> m ())
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall a b. (a -> b) -> a -> b
$
            Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> (Map CursorId (Cursor m h) -> Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h)
-> m (Map CursorId (Cursor m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CursorId
-> Cursor m h
-> Map CursorId (Cursor m h)
-> Map CursorId (Cursor m h)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert CursorId
cursorId Cursor m h
cursor
        Cursor m h -> m (Cursor m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cursor m h -> m (Cursor m h)) -> Cursor m h -> m (Cursor m h)
forall a b. (a -> b) -> a -> b
$! Cursor m h
cursor
  where
    -- The table contents escape the read access, but we just duplicate
    -- references to each run, so it is safe.
    dupTableContent :: ActionRegistry m
-> RWVar m (TableContent m h)
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
      Vector (Ref (Run m h)))
dupTableContent ActionRegistry m
reg RWVar m (TableContent m h)
contentVar = do
        RWVar m (TableContent m h)
-> (TableContent m h
    -> m (WriteBuffer, Ref (WriteBufferBlobs m h),
          Vector (Ref (Run m h))))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
      Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess RWVar m (TableContent m h)
contentVar ((TableContent m h
  -> m (WriteBuffer, Ref (WriteBufferBlobs m h),
        Vector (Ref (Run m h))))
 -> m (WriteBuffer, Ref (WriteBufferBlobs m h),
       Vector (Ref (Run m h))))
-> (TableContent m h
    -> m (WriteBuffer, Ref (WriteBufferBlobs m h),
          Vector (Ref (Run m h))))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
      Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \TableContent m h
content -> do
          let !wb :: WriteBuffer
wb      = TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
content
              !wbblobs :: Ref (WriteBufferBlobs m h)
wbblobs = TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
content
          Ref (WriteBufferBlobs m h)
wbblobs' <- ActionRegistry m
-> m (Ref (WriteBufferBlobs m h))
-> (Ref (WriteBufferBlobs m h) -> m ())
-> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m (Ref (WriteBufferBlobs m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (WriteBufferBlobs m h)
wbblobs) Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
          let runs :: Vector (Ref (Run m h))
runs = LevelsCache m h -> Vector (Ref (Run m h))
forall (m :: * -> *) h. LevelsCache m h -> Vector (Ref (Run m h))
cachedRuns (TableContent m h -> LevelsCache m h
forall (m :: * -> *) h. TableContent m h -> LevelsCache m h
tableCache TableContent m h
content)
          Vector (Ref (Run m h))
runs' <- Vector (Ref (Run m h))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall (m :: * -> *) a b.
Monad m =>
Vector a -> (a -> m b) -> m (Vector b)
V.forM Vector (Ref (Run m h))
runs ((Ref (Run m h) -> m (Ref (Run m h)))
 -> m (Vector (Ref (Run m h))))
-> (Ref (Run m h) -> m (Ref (Run m h)))
-> m (Vector (Ref (Run m h)))
forall a b. (a -> b) -> a -> b
$ \Ref (Run m h)
r ->
                     ActionRegistry m
-> m (Ref (Run m h))
-> (Ref (Run m h) -> m ())
-> m (Ref (Run m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg (Ref (Run m h) -> m (Ref (Run m h))
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadThrow m, HasCallStack) =>
Ref obj -> m (Ref obj)
dupRef Ref (Run m h)
r) Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
          (WriteBuffer, Ref (WriteBufferBlobs m h), Vector (Ref (Run m h)))
-> m (WriteBuffer, Ref (WriteBufferBlobs m h),
      Vector (Ref (Run m h)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WriteBuffer
wb, Ref (WriteBufferBlobs m h)
wbblobs', Vector (Ref (Run m h))
runs')

{-# SPECIALISE closeCursor :: Cursor IO h -> IO () #-}
-- | See 'Database.LSMTree.Normal.closeCursor'.
closeCursor ::
     (MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m)
  => Cursor m h
  -> m ()
closeCursor :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, PrimMonad m) =>
Cursor m h -> m ()
closeCursor Cursor {Tracer m CursorTrace
StrictMVar m (CursorState m h)
cursorState :: forall (m :: * -> *) h.
Cursor m h -> StrictMVar m (CursorState m h)
cursorTracer :: forall (m :: * -> *) h. Cursor m h -> Tracer m CursorTrace
cursorState :: StrictMVar m (CursorState m h)
cursorTracer :: Tracer m CursorTrace
..} = do
    Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ CursorTrace
TraceCloseCursor
    m (CursorState m h)
-> (CursorState m h -> m ())
-> (ActionRegistry m -> CursorState m h -> m (CursorState m h))
-> m ()
forall (m :: * -> *) st.
(PrimMonad m, MonadCatch m) =>
m st -> (st -> m ()) -> (ActionRegistry m -> st -> m st) -> m ()
modifyWithActionRegistry_ (StrictMVar m (CursorState m h) -> m (CursorState m h)
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> m a
takeMVar StrictMVar m (CursorState m h)
cursorState) (StrictMVar m (CursorState m h) -> CursorState m h -> m ()
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m ()
putMVar StrictMVar m (CursorState m h)
cursorState) ((ActionRegistry m -> CursorState m h -> m (CursorState m h))
 -> m ())
-> (ActionRegistry m -> CursorState m h -> m (CursorState m h))
-> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> \case
      CursorState m h
CursorClosed -> CursorState m h -> m (CursorState m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return CursorState m h
forall (m :: * -> *) h. CursorState m h
CursorClosed
      CursorOpen CursorEnv {Maybe (Readers m h)
Ref (WriteBufferBlobs m h)
Vector (Ref (Run m h))
CursorId
SessionEnv m h
Session m h
cursorSessionEnv :: forall (m :: * -> *) h. CursorEnv m h -> SessionEnv m h
cursorSession :: forall (m :: * -> *) h. CursorEnv m h -> Session m h
cursorId :: forall (m :: * -> *) h. CursorEnv m h -> CursorId
cursorReaders :: forall (m :: * -> *) h. CursorEnv m h -> Maybe (Readers m h)
cursorRuns :: forall (m :: * -> *) h. CursorEnv m h -> Vector (Ref (Run m h))
cursorWBB :: forall (m :: * -> *) h. CursorEnv m h -> Ref (WriteBufferBlobs m h)
cursorSession :: Session m h
cursorSessionEnv :: SessionEnv m h
cursorId :: CursorId
cursorReaders :: Maybe (Readers m h)
cursorRuns :: Vector (Ref (Run m h))
cursorWBB :: Ref (WriteBufferBlobs m h)
..} -> do
        -- This should be safe-ish, but it's still not ideal, because it doesn't
        -- rule out sync exceptions in the cleanup operations.
        -- In that case, the cursor ends up closed, but resources might not have
        -- been freed. Probably better than the other way around, though.
        ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          StrictMVar m (Map CursorId (Cursor m h))
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ (SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
forall (m :: * -> *) h.
SessionEnv m h -> StrictMVar m (Map CursorId (Cursor m h))
sessionOpenCursors SessionEnv m h
cursorSessionEnv) ((Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
 -> m ())
-> (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> m ()
forall a b. (a -> b) -> a -> b
$
            Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map CursorId (Cursor m h) -> m (Map CursorId (Cursor m h)))
-> (Map CursorId (Cursor m h) -> Map CursorId (Cursor m h))
-> Map CursorId (Cursor m h)
-> m (Map CursorId (Cursor m h))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CursorId -> Map CursorId (Cursor m h) -> Map CursorId (Cursor m h)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete CursorId
cursorId

        Maybe (Readers m h) -> (Readers m h -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (Readers m h)
cursorReaders ((Readers m h -> m ()) -> m ()) -> (Readers m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Readers m h -> m ()) -> Readers m h -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Readers m h -> m ()
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, PrimMonad m) =>
Readers m h -> m ()
Readers.close
        Vector (Ref (Run m h)) -> (Ref (Run m h) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (Ref (Run m h))
cursorRuns ((Ref (Run m h) -> m ()) -> m ())
-> (Ref (Run m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
        ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (Ref (WriteBufferBlobs m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef Ref (WriteBufferBlobs m h)
cursorWBB)
        CursorState m h -> m (CursorState m h)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return CursorState m h
forall (m :: * -> *) h. CursorState m h
CursorClosed

{-# SPECIALISE readCursor ::
     ResolveSerialisedValue
  -> Int
  -> Cursor IO h
  -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
  -> IO (V.Vector res) #-}
-- | See 'Database.LSMTree.Normal.readCursor'.
readCursor ::
     forall m h res.
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => ResolveSerialisedValue
  -> Int  -- ^ Maximum number of entries to read
  -> Cursor m h
  -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
     -- ^ How to map to a query result, different for normal/monoidal
  -> m (V.Vector res)
readCursor :: forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> Int
-> Cursor m h
-> (SerialisedKey
    -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursor ResolveSerialisedValue
resolve Int
n Cursor m h
cursor SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry =
    ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
    -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
    -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursorWhile ResolveSerialisedValue
resolve (Bool -> SerialisedKey -> Bool
forall a b. a -> b -> a
const Bool
True) Int
n Cursor m h
cursor SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry

-- | The cursor is closed.
data CursorClosedError
    = ErrCursorClosed
    deriving stock (Int -> CursorClosedError -> ShowS
[CursorClosedError] -> ShowS
CursorClosedError -> String
(Int -> CursorClosedError -> ShowS)
-> (CursorClosedError -> String)
-> ([CursorClosedError] -> ShowS)
-> Show CursorClosedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CursorClosedError -> ShowS
showsPrec :: Int -> CursorClosedError -> ShowS
$cshow :: CursorClosedError -> String
show :: CursorClosedError -> String
$cshowList :: [CursorClosedError] -> ShowS
showList :: [CursorClosedError] -> ShowS
Show, CursorClosedError -> CursorClosedError -> Bool
(CursorClosedError -> CursorClosedError -> Bool)
-> (CursorClosedError -> CursorClosedError -> Bool)
-> Eq CursorClosedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CursorClosedError -> CursorClosedError -> Bool
== :: CursorClosedError -> CursorClosedError -> Bool
$c/= :: CursorClosedError -> CursorClosedError -> Bool
/= :: CursorClosedError -> CursorClosedError -> Bool
Eq)
    deriving anyclass (Show CursorClosedError
Typeable CursorClosedError
(Typeable CursorClosedError, Show CursorClosedError) =>
(CursorClosedError -> SomeException)
-> (SomeException -> Maybe CursorClosedError)
-> (CursorClosedError -> String)
-> Exception CursorClosedError
SomeException -> Maybe CursorClosedError
CursorClosedError -> String
CursorClosedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: CursorClosedError -> SomeException
toException :: CursorClosedError -> SomeException
$cfromException :: SomeException -> Maybe CursorClosedError
fromException :: SomeException -> Maybe CursorClosedError
$cdisplayException :: CursorClosedError -> String
displayException :: CursorClosedError -> String
Exception)

{-# SPECIALISE readCursorWhile ::
     ResolveSerialisedValue
  -> (SerialisedKey -> Bool)
  -> Int
  -> Cursor IO h
  -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO h) -> res)
  -> IO (V.Vector res) #-}
-- | @readCursorWhile _ p n cursor _@ reads elements until either:
--
--    * @n@ elements have been read already
--    * @p@ returns @False@ for the key of an entry to be read
--    * the cursor is drained
--
-- Consequently, once a call returned fewer than @n@ elements, any subsequent
-- calls with the same predicate @p@ will return an empty vector.
readCursorWhile ::
     forall m h res.
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => ResolveSerialisedValue
  -> (SerialisedKey -> Bool)  -- ^ Only read as long as this predicate holds
  -> Int  -- ^ Maximum number of entries to read
  -> Cursor m h
  -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
     -- ^ How to map to a query result, different for normal/monoidal
  -> m (V.Vector res)
readCursorWhile :: forall (m :: * -> *) h res.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> Int
-> Cursor m h
-> (SerialisedKey
    -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> m (Vector res)
readCursorWhile ResolveSerialisedValue
resolve SerialisedKey -> Bool
keyIsWanted Int
n Cursor {Tracer m CursorTrace
StrictMVar m (CursorState m h)
cursorState :: forall (m :: * -> *) h.
Cursor m h -> StrictMVar m (CursorState m h)
cursorTracer :: forall (m :: * -> *) h. Cursor m h -> Tracer m CursorTrace
cursorState :: StrictMVar m (CursorState m h)
cursorTracer :: Tracer m CursorTrace
..} SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry = do
    Tracer m CursorTrace -> CursorTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m CursorTrace
cursorTracer (CursorTrace -> m ()) -> CursorTrace -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> CursorTrace
TraceReadCursor Int
n
    StrictMVar m (CursorState m h)
-> (CursorState m h -> m (CursorState m h, Vector res))
-> m (Vector res)
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m (a, b)) -> m b
modifyMVar StrictMVar m (CursorState m h)
cursorState ((CursorState m h -> m (CursorState m h, Vector res))
 -> m (Vector res))
-> (CursorState m h -> m (CursorState m h, Vector res))
-> m (Vector res)
forall a b. (a -> b) -> a -> b
$ \case
      CursorState m h
CursorClosed -> CursorClosedError -> m (CursorState m h, Vector res)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO CursorClosedError
ErrCursorClosed
      state :: CursorState m h
state@(CursorOpen CursorEnv m h
cursorEnv) -> do
        case CursorEnv m h -> Maybe (Readers m h)
forall (m :: * -> *) h. CursorEnv m h -> Maybe (Readers m h)
cursorReaders CursorEnv m h
cursorEnv of
          Maybe (Readers m h)
Nothing ->
            -- a drained cursor will just return an empty vector
            (CursorState m h, Vector res) -> m (CursorState m h, Vector res)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (CursorState m h
state, Vector res
forall a. Vector a
V.empty)
          Just Readers m h
readers -> do
            (Vector res
vec, HasMore
hasMore) <- ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey
    -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> Readers m h
-> Int
-> m (Vector res, HasMore)
forall h (m :: * -> *) res.
(MonadMask m, MonadST m, MonadSTM m) =>
ResolveSerialisedValue
-> (SerialisedKey -> Bool)
-> (SerialisedKey
    -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res)
-> Readers m h
-> Int
-> m (Vector res, HasMore)
Cursor.readEntriesWhile ResolveSerialisedValue
resolve SerialisedKey -> Bool
keyIsWanted SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m h) -> res
fromEntry Readers m h
readers Int
n
            -- if we drained the readers, remove them from the state
            let !state' :: CursorState m h
state' = case HasMore
hasMore of
                  HasMore
Readers.HasMore -> CursorState m h
state
                  HasMore
Readers.Drained -> CursorEnv m h -> CursorState m h
forall (m :: * -> *) h. CursorEnv m h -> CursorState m h
CursorOpen (CursorEnv m h
cursorEnv {cursorReaders = Nothing})
            (CursorState m h, Vector res) -> m (CursorState m h, Vector res)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (CursorState m h
state', Vector res
vec)

{-------------------------------------------------------------------------------
  Snapshots
-------------------------------------------------------------------------------}

-- | The named snapshot already exists.
data SnapshotExistsError
    = ErrSnapshotExists !SnapshotName
    deriving stock (Int -> SnapshotExistsError -> ShowS
[SnapshotExistsError] -> ShowS
SnapshotExistsError -> String
(Int -> SnapshotExistsError -> ShowS)
-> (SnapshotExistsError -> String)
-> ([SnapshotExistsError] -> ShowS)
-> Show SnapshotExistsError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotExistsError -> ShowS
showsPrec :: Int -> SnapshotExistsError -> ShowS
$cshow :: SnapshotExistsError -> String
show :: SnapshotExistsError -> String
$cshowList :: [SnapshotExistsError] -> ShowS
showList :: [SnapshotExistsError] -> ShowS
Show, SnapshotExistsError -> SnapshotExistsError -> Bool
(SnapshotExistsError -> SnapshotExistsError -> Bool)
-> (SnapshotExistsError -> SnapshotExistsError -> Bool)
-> Eq SnapshotExistsError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotExistsError -> SnapshotExistsError -> Bool
== :: SnapshotExistsError -> SnapshotExistsError -> Bool
$c/= :: SnapshotExistsError -> SnapshotExistsError -> Bool
/= :: SnapshotExistsError -> SnapshotExistsError -> Bool
Eq)
    deriving anyclass (Show SnapshotExistsError
Typeable SnapshotExistsError
(Typeable SnapshotExistsError, Show SnapshotExistsError) =>
(SnapshotExistsError -> SomeException)
-> (SomeException -> Maybe SnapshotExistsError)
-> (SnapshotExistsError -> String)
-> Exception SnapshotExistsError
SomeException -> Maybe SnapshotExistsError
SnapshotExistsError -> String
SnapshotExistsError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotExistsError -> SomeException
toException :: SnapshotExistsError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotExistsError
fromException :: SomeException -> Maybe SnapshotExistsError
$cdisplayException :: SnapshotExistsError -> String
displayException :: SnapshotExistsError -> String
Exception)

{-# SPECIALISE createSnapshot ::
     SnapshotName
  -> SnapshotLabel
  -> SnapshotTableType
  -> Table IO h
  -> IO () #-}
-- |  See 'Database.LSMTree.Normal.createSnapshot''.
createSnapshot ::
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => SnapshotName
  -> SnapshotLabel
  -> SnapshotTableType
  -> Table m h
  -> m ()
createSnapshot :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
SnapshotName
-> SnapshotLabel -> SnapshotTableType -> Table m h -> m ()
createSnapshot SnapshotName
snap SnapshotLabel
label SnapshotTableType
tableType Table m h
t = do
    Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> TableTrace
TraceSnapshot SnapshotName
snap
    Table m h -> (TableEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m ()) -> m ()) -> (TableEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv ->
      (ActionRegistry m -> m ()) -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m ()) -> m ())
-> (ActionRegistry m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do -- TODO: use the action registry for all side effects
        let hfs :: HasFS m h
hfs  = TableEnv m h -> HasFS m h
forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS TableEnv m h
tEnv
            hbio :: HasBlockIO m h
hbio = TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv
            activeUc :: UniqCounter m
activeUc = TableEnv m h -> UniqCounter m
forall (m :: * -> *) h. TableEnv m h -> UniqCounter m
tableSessionUniqCounter TableEnv m h
tEnv

        -- Guard that the snapshot does not exist already
        let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv) SnapshotName
snap
        Bool
snapshotExists <- SnapshotName -> SessionEnv m h -> m Bool
forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap (TableEnv m h -> SessionEnv m h
forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableSessionEnv TableEnv m h
tEnv)
        if Bool
snapshotExists then
          SnapshotExistsError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName -> SnapshotExistsError
ErrSnapshotExists SnapshotName
snap)
        else
          -- we assume the snapshots directory already exists, so we just have
          -- to create the directory for this specific snapshot.
          ActionRegistry m -> m () -> m () -> m ()
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> m () -> m a
withRollback_ ActionRegistry m
reg
            (HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.createDirectory HasFS m h
hfs (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir))
            (HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive HasFS m h
hfs (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir))

        -- Duplicate references to the table content, so that resources do not disappear
        -- from under our feet while taking a snapshot. These references are released
        -- again after the snapshot files/directories are written.
        TableContent m h
content <- RWVar m (TableContent m h)
-> (TableContent m h -> m (TableContent m h))
-> m (TableContent m h)
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) (ActionRegistry m -> TableContent m h -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m (TableContent m h)
duplicateTableContent ActionRegistry m
reg)

        -- Fresh UniqCounter so that we start numbering from 0 in the named
        -- snapshot directory
        UniqCounter m
snapUc <- Int -> m (UniqCounter m)
forall (m :: * -> *). PrimMonad m => Int -> m (UniqCounter m)
newUniqCounter Int
0

        -- Snapshot the write buffer.
        let activeDir :: ActiveDir
activeDir = SessionRoot -> ActiveDir
Paths.activeDir (TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv)
        let wb :: WriteBuffer
wb = TableContent m h -> WriteBuffer
forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer TableContent m h
content
        let wbb :: Ref (WriteBufferBlobs m h)
wbb = TableContent m h -> Ref (WriteBufferBlobs m h)
forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs TableContent m h
content
        RunNumber
snapWriteBufferNumber <- WriteBufferFsPaths -> RunNumber
Paths.writeBufferNumber (WriteBufferFsPaths -> RunNumber)
-> m WriteBufferFsPaths -> m RunNumber
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
            HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> UniqCounter m
-> ActionRegistry m
-> ActiveDir
-> NamedSnapshotDir
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m WriteBufferFsPaths
forall (m :: * -> *) h.
(MonadMVar m, MonadSTM m, MonadST m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> UniqCounter m
-> ActionRegistry m
-> ActiveDir
-> NamedSnapshotDir
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m WriteBufferFsPaths
snapshotWriteBuffer HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
activeUc UniqCounter m
snapUc ActionRegistry m
reg ActiveDir
activeDir NamedSnapshotDir
snapDir WriteBuffer
wb Ref (WriteBufferBlobs m h)
wbb

        -- Convert to snapshot format
        SnapLevels (Ref (Run m h))
snapLevels <- Levels m h -> m (SnapLevels (Ref (Run m h)))
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m) =>
Levels m h -> m (SnapLevels (Ref (Run m h)))
toSnapLevels (TableContent m h -> Levels m h
forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels TableContent m h
content)

        -- Hard link runs into the named snapshot directory
        SnapLevels SnapshotRun
snapLevels' <- (Ref (Run m h) -> m SnapshotRun)
-> SnapLevels (Ref (Run m h)) -> m (SnapLevels SnapshotRun)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapLevels a -> f (SnapLevels b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
snapshotRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
snapUc ActionRegistry m
reg NamedSnapshotDir
snapDir) SnapLevels (Ref (Run m h))
snapLevels

        -- If a merging tree exists, do the same hard-linking for the runs within
        Maybe (SnapMergingTree SnapshotRun)
mTreeOpt <- case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
content of
          UnionLevel m h
NoUnion -> Maybe (SnapMergingTree SnapshotRun)
-> m (Maybe (SnapMergingTree SnapshotRun))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (SnapMergingTree SnapshotRun)
forall a. Maybe a
Nothing
          Union Ref (MergingTree m h)
mTreeRef -> do
            SnapMergingTree (Ref (Run m h))
mTree <- Ref (MergingTree m h) -> m (SnapMergingTree (Ref (Run m h)))
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m) =>
Ref (MergingTree m h) -> m (SnapMergingTree (Ref (Run m h)))
toSnapMergingTree Ref (MergingTree m h)
mTreeRef
            SnapMergingTree SnapshotRun -> Maybe (SnapMergingTree SnapshotRun)
forall a. a -> Maybe a
Just (SnapMergingTree SnapshotRun
 -> Maybe (SnapMergingTree SnapshotRun))
-> m (SnapMergingTree SnapshotRun)
-> m (Maybe (SnapMergingTree SnapshotRun))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ref (Run m h) -> m SnapshotRun)
-> SnapMergingTree (Ref (Run m h))
-> m (SnapMergingTree SnapshotRun)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapMergingTree a -> f (SnapMergingTree b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
forall (m :: * -> *) h.
(MonadMask m, PrimMonad m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> Ref (Run m h)
-> m SnapshotRun
snapshotRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
snapUc ActionRegistry m
reg NamedSnapshotDir
snapDir) SnapMergingTree (Ref (Run m h))
mTree

        ActionRegistry m -> TableContent m h -> m ()
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m ()
releaseTableContent ActionRegistry m
reg TableContent m h
content

        let snapMetaData :: SnapshotMetaData
snapMetaData = SnapshotLabel
-> SnapshotTableType
-> TableConfig
-> RunNumber
-> SnapLevels SnapshotRun
-> Maybe (SnapMergingTree SnapshotRun)
-> SnapshotMetaData
SnapshotMetaData
                SnapshotLabel
label
                SnapshotTableType
tableType
                (Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig Table m h
t)
                RunNumber
snapWriteBufferNumber
                SnapLevels SnapshotRun
snapLevels'
                Maybe (SnapMergingTree SnapshotRun)
mTreeOpt
            SnapshotMetaDataFile FsPath
contentPath = NamedSnapshotDir -> SnapshotMetaDataFile
Paths.snapshotMetaDataFile NamedSnapshotDir
snapDir
            SnapshotMetaDataChecksumFile FsPath
checksumPath = NamedSnapshotDir -> SnapshotMetaDataChecksumFile
Paths.snapshotMetaDataChecksumFile NamedSnapshotDir
snapDir
        HasFS m h -> FsPath -> FsPath -> SnapshotMetaData -> m ()
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> FsPath -> FsPath -> SnapshotMetaData -> m ()
writeFileSnapshotMetaData HasFS m h
hfs FsPath
contentPath FsPath
checksumPath SnapshotMetaData
snapMetaData

        -- Make the directory and its contents durable.
        HasFS m h -> HasBlockIO m h -> FsPath -> m ()
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> HasBlockIO m h -> FsPath -> m ()
FS.synchroniseDirectoryRecursive HasFS m h
hfs HasBlockIO m h
hbio (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir)

-- | The named snapshot does not exist.
data SnapshotDoesNotExistError
    = ErrSnapshotDoesNotExist !SnapshotName
    deriving stock (Int -> SnapshotDoesNotExistError -> ShowS
[SnapshotDoesNotExistError] -> ShowS
SnapshotDoesNotExistError -> String
(Int -> SnapshotDoesNotExistError -> ShowS)
-> (SnapshotDoesNotExistError -> String)
-> ([SnapshotDoesNotExistError] -> ShowS)
-> Show SnapshotDoesNotExistError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotDoesNotExistError -> ShowS
showsPrec :: Int -> SnapshotDoesNotExistError -> ShowS
$cshow :: SnapshotDoesNotExistError -> String
show :: SnapshotDoesNotExistError -> String
$cshowList :: [SnapshotDoesNotExistError] -> ShowS
showList :: [SnapshotDoesNotExistError] -> ShowS
Show, SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
(SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool)
-> (SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool)
-> Eq SnapshotDoesNotExistError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
== :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
$c/= :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
/= :: SnapshotDoesNotExistError -> SnapshotDoesNotExistError -> Bool
Eq)
    deriving anyclass (Show SnapshotDoesNotExistError
Typeable SnapshotDoesNotExistError
(Typeable SnapshotDoesNotExistError,
 Show SnapshotDoesNotExistError) =>
(SnapshotDoesNotExistError -> SomeException)
-> (SomeException -> Maybe SnapshotDoesNotExistError)
-> (SnapshotDoesNotExistError -> String)
-> Exception SnapshotDoesNotExistError
SomeException -> Maybe SnapshotDoesNotExistError
SnapshotDoesNotExistError -> String
SnapshotDoesNotExistError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotDoesNotExistError -> SomeException
toException :: SnapshotDoesNotExistError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotDoesNotExistError
fromException :: SomeException -> Maybe SnapshotDoesNotExistError
$cdisplayException :: SnapshotDoesNotExistError -> String
displayException :: SnapshotDoesNotExistError -> String
Exception)

-- | The named snapshot is corrupted.
data SnapshotCorruptedError
    = ErrSnapshotCorrupted
        !SnapshotName
        !FileCorruptedError
    deriving stock (Int -> SnapshotCorruptedError -> ShowS
[SnapshotCorruptedError] -> ShowS
SnapshotCorruptedError -> String
(Int -> SnapshotCorruptedError -> ShowS)
-> (SnapshotCorruptedError -> String)
-> ([SnapshotCorruptedError] -> ShowS)
-> Show SnapshotCorruptedError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotCorruptedError -> ShowS
showsPrec :: Int -> SnapshotCorruptedError -> ShowS
$cshow :: SnapshotCorruptedError -> String
show :: SnapshotCorruptedError -> String
$cshowList :: [SnapshotCorruptedError] -> ShowS
showList :: [SnapshotCorruptedError] -> ShowS
Show, SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
(SnapshotCorruptedError -> SnapshotCorruptedError -> Bool)
-> (SnapshotCorruptedError -> SnapshotCorruptedError -> Bool)
-> Eq SnapshotCorruptedError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
== :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
$c/= :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
/= :: SnapshotCorruptedError -> SnapshotCorruptedError -> Bool
Eq)
    deriving anyclass (Show SnapshotCorruptedError
Typeable SnapshotCorruptedError
(Typeable SnapshotCorruptedError, Show SnapshotCorruptedError) =>
(SnapshotCorruptedError -> SomeException)
-> (SomeException -> Maybe SnapshotCorruptedError)
-> (SnapshotCorruptedError -> String)
-> Exception SnapshotCorruptedError
SomeException -> Maybe SnapshotCorruptedError
SnapshotCorruptedError -> String
SnapshotCorruptedError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotCorruptedError -> SomeException
toException :: SnapshotCorruptedError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotCorruptedError
fromException :: SomeException -> Maybe SnapshotCorruptedError
$cdisplayException :: SnapshotCorruptedError -> String
displayException :: SnapshotCorruptedError -> String
Exception)

-- | The named snapshot is not compatible with the expected type.
data SnapshotNotCompatibleError
    = -- | The named snapshot is not compatible with the current public API module.
      --   For instance, the snapshot was created using the simple API, but was opened using the full API.
      ErrSnapshotWrongTableType
        !SnapshotName
        -- | Expected type
        !SnapshotTableType
        -- | Actual type
        !SnapshotTableType
    | -- | The named snapshot is not compatible with the given label.
      ErrSnapshotWrongLabel
        !SnapshotName
        -- | Expected label
        !SnapshotLabel
        -- | Actual label
        !SnapshotLabel
    deriving stock (Int -> SnapshotNotCompatibleError -> ShowS
[SnapshotNotCompatibleError] -> ShowS
SnapshotNotCompatibleError -> String
(Int -> SnapshotNotCompatibleError -> ShowS)
-> (SnapshotNotCompatibleError -> String)
-> ([SnapshotNotCompatibleError] -> ShowS)
-> Show SnapshotNotCompatibleError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SnapshotNotCompatibleError -> ShowS
showsPrec :: Int -> SnapshotNotCompatibleError -> ShowS
$cshow :: SnapshotNotCompatibleError -> String
show :: SnapshotNotCompatibleError -> String
$cshowList :: [SnapshotNotCompatibleError] -> ShowS
showList :: [SnapshotNotCompatibleError] -> ShowS
Show, SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
(SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool)
-> (SnapshotNotCompatibleError
    -> SnapshotNotCompatibleError -> Bool)
-> Eq SnapshotNotCompatibleError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
== :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
$c/= :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
/= :: SnapshotNotCompatibleError -> SnapshotNotCompatibleError -> Bool
Eq)
    deriving anyclass (Show SnapshotNotCompatibleError
Typeable SnapshotNotCompatibleError
(Typeable SnapshotNotCompatibleError,
 Show SnapshotNotCompatibleError) =>
(SnapshotNotCompatibleError -> SomeException)
-> (SomeException -> Maybe SnapshotNotCompatibleError)
-> (SnapshotNotCompatibleError -> String)
-> Exception SnapshotNotCompatibleError
SomeException -> Maybe SnapshotNotCompatibleError
SnapshotNotCompatibleError -> String
SnapshotNotCompatibleError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: SnapshotNotCompatibleError -> SomeException
toException :: SnapshotNotCompatibleError -> SomeException
$cfromException :: SomeException -> Maybe SnapshotNotCompatibleError
fromException :: SomeException -> Maybe SnapshotNotCompatibleError
$cdisplayException :: SnapshotNotCompatibleError -> String
displayException :: SnapshotNotCompatibleError -> String
Exception)

{-# SPECIALISE openSnapshot ::
     Session IO h
  -> SnapshotLabel
  -> SnapshotTableType
  -> TableConfigOverride
  -> SnapshotName
  -> ResolveSerialisedValue
  -> IO (Table IO h) #-}
-- |  See 'Database.LSMTree.Normal.openSnapshot'.
openSnapshot ::
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => Session m h
  -> SnapshotLabel -- ^ Expected label
  -> SnapshotTableType -- ^ Expected table type
  -> TableConfigOverride -- ^ Optional config override
  -> SnapshotName
  -> ResolveSerialisedValue
  -> m (Table m h)
openSnapshot :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Session m h
-> SnapshotLabel
-> SnapshotTableType
-> TableConfigOverride
-> SnapshotName
-> ResolveSerialisedValue
-> m (Table m h)
openSnapshot Session m h
sesh SnapshotLabel
label SnapshotTableType
tableType TableConfigOverride
override SnapshotName
snap ResolveSerialisedValue
resolve =
  SnapshotName -> m (Table m h) -> m (Table m h)
forall (m :: * -> *) a. MonadCatch m => SnapshotName -> m a -> m a
wrapFileCorruptedErrorAsSnapshotCorruptedError SnapshotName
snap (m (Table m h) -> m (Table m h)) -> m (Table m h) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ do
    Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh) (LSMTreeTrace -> m ()) -> LSMTreeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> TableConfigOverride -> LSMTreeTrace
TraceOpenSnapshot SnapshotName
snap TableConfigOverride
override
    Session m h -> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m (Table m h)) -> m (Table m h))
-> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
      (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Table m h)) -> m (Table m h))
-> (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
        let hfs :: HasFS m h
hfs     = SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv
            hbio :: HasBlockIO m h
hbio    = SessionEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO SessionEnv m h
seshEnv
            uc :: UniqCounter m
uc      = SessionEnv m h -> UniqCounter m
forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter SessionEnv m h
seshEnv

        -- Guard that the snapshot exists
        let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) SnapshotName
snap
        HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir) m Bool -> (Bool -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
b ->
          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName -> SnapshotDoesNotExistError
ErrSnapshotDoesNotExist SnapshotName
snap)

        let SnapshotMetaDataFile FsPath
contentPath = NamedSnapshotDir -> SnapshotMetaDataFile
Paths.snapshotMetaDataFile NamedSnapshotDir
snapDir
            SnapshotMetaDataChecksumFile FsPath
checksumPath = NamedSnapshotDir -> SnapshotMetaDataChecksumFile
Paths.snapshotMetaDataChecksumFile NamedSnapshotDir
snapDir

        SnapshotMetaData
snapMetaData <- HasFS m h -> FsPath -> FsPath -> m SnapshotMetaData
forall (m :: * -> *) h.
MonadThrow m =>
HasFS m h -> FsPath -> FsPath -> m SnapshotMetaData
readFileSnapshotMetaData HasFS m h
hfs FsPath
contentPath FsPath
checksumPath

        let SnapshotMetaData SnapshotLabel
label' SnapshotTableType
tableType' TableConfig
conf RunNumber
snapWriteBuffer SnapLevels SnapshotRun
snapLevels Maybe (SnapMergingTree SnapshotRun)
mTreeOpt = SnapshotMetaData
snapMetaData

        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (SnapshotTableType
tableType SnapshotTableType -> SnapshotTableType -> Bool
forall a. Eq a => a -> a -> Bool
== SnapshotTableType
tableType') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          SnapshotNotCompatibleError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName
-> SnapshotTableType
-> SnapshotTableType
-> SnapshotNotCompatibleError
ErrSnapshotWrongTableType SnapshotName
snap SnapshotTableType
tableType SnapshotTableType
tableType')

        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (SnapshotLabel
label SnapshotLabel -> SnapshotLabel -> Bool
forall a. Eq a => a -> a -> Bool
== SnapshotLabel
label') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          SnapshotNotCompatibleError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName
-> SnapshotLabel -> SnapshotLabel -> SnapshotNotCompatibleError
ErrSnapshotWrongLabel SnapshotName
snap SnapshotLabel
label SnapshotLabel
label')

        let conf' :: TableConfig
conf' = TableConfigOverride -> TableConfig -> TableConfig
applyOverride TableConfigOverride
override TableConfig
conf
        ArenaManager (PrimState m)
am <- m (ArenaManager (PrimState m))
forall (m :: * -> *). PrimMonad m => m (ArenaManager (PrimState m))
newArenaManager

        let activeDir :: ActiveDir
activeDir = SessionRoot -> ActiveDir
Paths.activeDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv)

        -- Read write buffer
        let snapWriteBufferPaths :: WriteBufferFsPaths
snapWriteBufferPaths = FsPath -> RunNumber -> WriteBufferFsPaths
Paths.WriteBufferFsPaths (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir) RunNumber
snapWriteBuffer
        (WriteBuffer
tableWriteBuffer, Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs) <-
          ActionRegistry m
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActiveDir
-> WriteBufferFsPaths
-> m (WriteBuffer, Ref (WriteBufferBlobs m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m) =>
ActionRegistry m
-> ResolveSerialisedValue
-> HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActiveDir
-> WriteBufferFsPaths
-> m (WriteBuffer, Ref (WriteBufferBlobs m h))
openWriteBuffer ActionRegistry m
reg ResolveSerialisedValue
resolve HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ActiveDir
activeDir WriteBufferFsPaths
snapWriteBufferPaths

        -- Hard link runs into the active directory,
        SnapLevels (Ref (Run m h))
snapLevels' <- (SnapshotRun -> m (Ref (Run m h)))
-> SnapLevels SnapshotRun -> m (SnapLevels (Ref (Run m h)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapLevels a -> f (SnapLevels b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> SnapshotRun
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> SnapshotRun
-> m (Ref (Run m h))
openRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ActionRegistry m
reg NamedSnapshotDir
snapDir ActiveDir
activeDir) SnapLevels SnapshotRun
snapLevels
        UnionLevel m h
unionLevel <- case Maybe (SnapMergingTree SnapshotRun)
mTreeOpt of
              Maybe (SnapMergingTree SnapshotRun)
Nothing -> UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure UnionLevel m h
forall (m :: * -> *) h. UnionLevel m h
NoUnion
              Just SnapMergingTree SnapshotRun
mTree -> do
                SnapMergingTree (Ref (Run m h))
snapTree <- (SnapshotRun -> m (Ref (Run m h)))
-> SnapMergingTree SnapshotRun
-> m (SnapMergingTree (Ref (Run m h)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> SnapMergingTree a -> f (SnapMergingTree b)
traverse (HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> SnapshotRun
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadMask m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ActionRegistry m
-> NamedSnapshotDir
-> ActiveDir
-> SnapshotRun
-> m (Ref (Run m h))
openRun HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ActionRegistry m
reg NamedSnapshotDir
snapDir ActiveDir
activeDir) SnapMergingTree SnapshotRun
mTree
                Ref (MergingTree m h)
mt <- HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ResolveSerialisedValue
-> ActiveDir
-> ActionRegistry m
-> SnapMergingTree (Ref (Run m h))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> ResolveSerialisedValue
-> ActiveDir
-> ActionRegistry m
-> SnapMergingTree (Ref (Run m h))
-> m (Ref (MergingTree m h))
fromSnapMergingTree HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc ResolveSerialisedValue
resolve ActiveDir
activeDir ActionRegistry m
reg SnapMergingTree (Ref (Run m h))
snapTree
                (Ref (Run m h) -> m ()) -> SnapMergingTree (Ref (Run m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) SnapMergingTree (Ref (Run m h))
snapTree
                UnionLevel m h -> m (UnionLevel m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Ref (MergingTree m h) -> UnionLevel m h
forall (m :: * -> *) h. Ref (MergingTree m h) -> UnionLevel m h
Union Ref (MergingTree m h)
mt)

        -- Convert from the snapshot format, restoring merge progress in the process
        Levels m h
tableLevels <- HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> ActionRegistry m
-> ActiveDir
-> SnapLevels (Ref (Run m h))
-> m (Levels m h)
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadSTM m, MonadST m) =>
HasFS m h
-> HasBlockIO m h
-> UniqCounter m
-> TableConfig
-> ResolveSerialisedValue
-> ActionRegistry m
-> ActiveDir
-> SnapLevels (Ref (Run m h))
-> m (Levels m h)
fromSnapLevels HasFS m h
hfs HasBlockIO m h
hbio UniqCounter m
uc TableConfig
conf ResolveSerialisedValue
resolve ActionRegistry m
reg ActiveDir
activeDir SnapLevels (Ref (Run m h))
snapLevels'
        (Ref (Run m h) -> m ()) -> SnapLevels (Ref (Run m h)) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ()) -> (Ref (Run m h) -> m ()) -> Ref (Run m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) SnapLevels (Ref (Run m h))
snapLevels'

        LevelsCache m h
tableCache <- ActionRegistry m -> Levels m h -> m (LevelsCache m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMVar m, MonadMask m) =>
ActionRegistry m -> Levels m h -> m (LevelsCache m h)
mkLevelsCache ActionRegistry m
reg Levels m h
tableLevels
        ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf' ArenaManager (PrimState m)
am (TableContent m h -> m (Table m h))
-> TableContent m h -> m (Table m h)
forall a b. (a -> b) -> a -> b
$! TableContent {
            WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer
          , Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
          , Levels m h
tableLevels :: Levels m h
tableLevels :: Levels m h
tableLevels
          , LevelsCache m h
tableCache :: LevelsCache m h
tableCache :: LevelsCache m h
tableCache
          , tableUnionLevel :: UnionLevel m h
tableUnionLevel = UnionLevel m h
unionLevel
          }

{-# SPECIALISE wrapFileCorruptedErrorAsSnapshotCorruptedError ::
       SnapshotName
    -> IO a
    -> IO a
    #-}
wrapFileCorruptedErrorAsSnapshotCorruptedError ::
       forall m a.
       (MonadCatch m)
    => SnapshotName
    -> m a
    -> m a
wrapFileCorruptedErrorAsSnapshotCorruptedError :: forall (m :: * -> *) a. MonadCatch m => SnapshotName -> m a -> m a
wrapFileCorruptedErrorAsSnapshotCorruptedError SnapshotName
snapshotName m a
action =
    m a
action m a -> [Handler m a] -> m a
forall (m :: * -> *) a. MonadCatch m => m a -> [Handler m a] -> m a
`catches` [Handler m a]
handlers
    where
        handlers :: [Handler m a]
        handlers :: [Handler m a]
handlers =
            [ (FileCorruptedError -> m a) -> Handler m a
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((FileCorruptedError -> m a) -> Handler m a)
-> (FileCorruptedError -> m a) -> Handler m a
forall a b. (a -> b) -> a -> b
$ SnapshotCorruptedError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotCorruptedError -> m a)
-> (FileCorruptedError -> SnapshotCorruptedError)
-> FileCorruptedError
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FileCorruptedError -> SnapshotCorruptedError
wrapFileCorruptedError
            , (AbortActionRegistryError -> m a) -> Handler m a
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((AbortActionRegistryError -> m a) -> Handler m a)
-> (AbortActionRegistryError -> m a) -> Handler m a
forall a b. (a -> b) -> a -> b
$ AbortActionRegistryError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (AbortActionRegistryError -> m a)
-> (AbortActionRegistryError -> AbortActionRegistryError)
-> AbortActionRegistryError
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AbortActionRegistryError -> AbortActionRegistryError
wrapAbortActionRegistryError
            , (CommitActionRegistryError -> m a) -> Handler m a
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((CommitActionRegistryError -> m a) -> Handler m a)
-> (CommitActionRegistryError -> m a) -> Handler m a
forall a b. (a -> b) -> a -> b
$ CommitActionRegistryError -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (CommitActionRegistryError -> m a)
-> (CommitActionRegistryError -> CommitActionRegistryError)
-> CommitActionRegistryError
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CommitActionRegistryError -> CommitActionRegistryError
wrapCommitActionRegistryError
            ]

        -- TODO: This erases the `ExceptionContext` of the underlying `FileCorruptedError`,
        --       `AbortActionRegistryError`, or `CommitActionRegistryError`.
        --       Unfortunately, the API exposed by `io-classes` does not currently expose
        --       any primitives that could be used to preserve the `ExceptionContext`.
        wrapSomeException :: SomeException -> SomeException
        wrapSomeException :: SomeException -> SomeException
wrapSomeException SomeException
e =
            SomeException -> Maybe SomeException -> SomeException
forall a. a -> Maybe a -> a
fromMaybe SomeException
e (Maybe SomeException -> SomeException)
-> ([Maybe SomeException] -> Maybe SomeException)
-> [Maybe SomeException]
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. First SomeException -> Maybe SomeException
forall a. First a -> Maybe a
getFirst (First SomeException -> Maybe SomeException)
-> ([Maybe SomeException] -> First SomeException)
-> [Maybe SomeException]
-> Maybe SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [First SomeException] -> First SomeException
forall a. Monoid a => [a] -> a
mconcat ([First SomeException] -> First SomeException)
-> ([Maybe SomeException] -> [First SomeException])
-> [Maybe SomeException]
-> First SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe SomeException -> First SomeException)
-> [Maybe SomeException] -> [First SomeException]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe SomeException -> First SomeException
forall a. Maybe a -> First a
First ([Maybe SomeException] -> SomeException)
-> [Maybe SomeException] -> SomeException
forall a b. (a -> b) -> a -> b
$
                [ SnapshotCorruptedError -> SomeException
forall e. Exception e => e -> SomeException
toException (SnapshotCorruptedError -> SomeException)
-> (FileCorruptedError -> SnapshotCorruptedError)
-> FileCorruptedError
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FileCorruptedError -> SnapshotCorruptedError
wrapFileCorruptedError (FileCorruptedError -> SomeException)
-> Maybe FileCorruptedError -> Maybe SomeException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SomeException -> Maybe FileCorruptedError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
                , AbortActionRegistryError -> SomeException
forall e. Exception e => e -> SomeException
toException (AbortActionRegistryError -> SomeException)
-> (AbortActionRegistryError -> AbortActionRegistryError)
-> AbortActionRegistryError
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AbortActionRegistryError -> AbortActionRegistryError
wrapAbortActionRegistryError (AbortActionRegistryError -> SomeException)
-> Maybe AbortActionRegistryError -> Maybe SomeException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SomeException -> Maybe AbortActionRegistryError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
                , CommitActionRegistryError -> SomeException
forall e. Exception e => e -> SomeException
toException (CommitActionRegistryError -> SomeException)
-> (CommitActionRegistryError -> CommitActionRegistryError)
-> CommitActionRegistryError
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CommitActionRegistryError -> CommitActionRegistryError
wrapCommitActionRegistryError (CommitActionRegistryError -> SomeException)
-> Maybe CommitActionRegistryError -> Maybe SomeException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SomeException -> Maybe CommitActionRegistryError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
                ]

        wrapFileCorruptedError :: FileCorruptedError -> SnapshotCorruptedError
        wrapFileCorruptedError :: FileCorruptedError -> SnapshotCorruptedError
wrapFileCorruptedError = SnapshotName -> FileCorruptedError -> SnapshotCorruptedError
ErrSnapshotCorrupted SnapshotName
snapshotName

        wrapAbortActionRegistryError :: AbortActionRegistryError -> AbortActionRegistryError
        wrapAbortActionRegistryError :: AbortActionRegistryError -> AbortActionRegistryError
wrapAbortActionRegistryError = \case
            AbortActionRegistryError AbortActionRegistryReason
reason NonEmpty ActionError
es ->
                AbortActionRegistryReason
-> NonEmpty ActionError -> AbortActionRegistryError
AbortActionRegistryError (AbortActionRegistryReason -> AbortActionRegistryReason
wrapAbortActionRegistryReason AbortActionRegistryReason
reason) ((SomeException -> SomeException) -> ActionError -> ActionError
mapActionError SomeException -> SomeException
wrapSomeException (ActionError -> ActionError)
-> NonEmpty ActionError -> NonEmpty ActionError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty ActionError
es)

        wrapAbortActionRegistryReason :: AbortActionRegistryReason -> AbortActionRegistryReason
        wrapAbortActionRegistryReason :: AbortActionRegistryReason -> AbortActionRegistryReason
wrapAbortActionRegistryReason = \case
            ReasonExitCaseException SomeException
e -> SomeException -> AbortActionRegistryReason
ReasonExitCaseException (SomeException -> SomeException
wrapSomeException SomeException
e)
            AbortActionRegistryReason
ReasonExitCaseAbort -> AbortActionRegistryReason
ReasonExitCaseAbort

        wrapCommitActionRegistryError :: CommitActionRegistryError -> CommitActionRegistryError
        wrapCommitActionRegistryError :: CommitActionRegistryError -> CommitActionRegistryError
wrapCommitActionRegistryError = \case
            CommitActionRegistryError NonEmpty ActionError
es ->
                NonEmpty ActionError -> CommitActionRegistryError
CommitActionRegistryError ((SomeException -> SomeException) -> ActionError -> ActionError
mapActionError SomeException -> SomeException
wrapSomeException (ActionError -> ActionError)
-> NonEmpty ActionError -> NonEmpty ActionError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty ActionError
es)

{-# SPECIALISE doesSnapshotExist ::
     Session IO h
  -> SnapshotName
  -> IO Bool #-}
-- |  See 'Database.LSMTree.Common.doesSnapshotExist'.
doesSnapshotExist ::
     (MonadMask m, MonadSTM m)
  => Session m h
  -> SnapshotName
  -> m Bool
doesSnapshotExist :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m) =>
Session m h -> SnapshotName -> m Bool
doesSnapshotExist Session m h
sesh SnapshotName
snap = Session m h -> (SessionEnv m h -> m Bool) -> m Bool
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh (SnapshotName -> SessionEnv m h -> m Bool
forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap)

-- | Internal helper: Variant of 'doesSnapshotExist' that does not take a session lock.
doesSnapshotDirExist :: SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist :: forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap SessionEnv m h
seshEnv = do
  let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) SnapshotName
snap
  HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir)

{-# SPECIALISE deleteSnapshot ::
     Session IO h
  -> SnapshotName
  -> IO () #-}
-- |  See 'Database.LSMTree.Common.deleteSnapshot'.
deleteSnapshot ::
     (MonadMask m, MonadSTM m)
  => Session m h
  -> SnapshotName
  -> m ()
deleteSnapshot :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m) =>
Session m h -> SnapshotName -> m ()
deleteSnapshot Session m h
sesh SnapshotName
snap = do
    Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh) (LSMTreeTrace -> m ()) -> LSMTreeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotName -> LSMTreeTrace
TraceDeleteSnapshot SnapshotName
snap
    Session m h -> (SessionEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m ()) -> m ())
-> (SessionEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
      let snapDir :: NamedSnapshotDir
snapDir = SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv) SnapshotName
snap
      Bool
snapshotExists <- SnapshotName -> SessionEnv m h -> m Bool
forall (m :: * -> *) h. SnapshotName -> SessionEnv m h -> m Bool
doesSnapshotDirExist SnapshotName
snap SessionEnv m h
seshEnv
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
snapshotExists (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SnapshotDoesNotExistError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SnapshotName -> SnapshotDoesNotExistError
ErrSnapshotDoesNotExist SnapshotName
snap)
      HasFS m h -> HasCallStack => FsPath -> m ()
forall (m :: * -> *) h. HasFS m h -> HasCallStack => FsPath -> m ()
FS.removeDirectoryRecursive (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir NamedSnapshotDir
snapDir)

{-# SPECIALISE listSnapshots :: Session IO h -> IO [SnapshotName] #-}
-- |  See 'Database.LSMTree.Common.listSnapshots'.
listSnapshots ::
     (MonadMask m, MonadSTM m)
  => Session m h
  -> m [SnapshotName]
listSnapshots :: forall (m :: * -> *) h.
(MonadMask m, MonadSTM m) =>
Session m h -> m [SnapshotName]
listSnapshots Session m h
sesh = do
    Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh) LSMTreeTrace
TraceListSnapshots
    Session m h
-> (SessionEnv m h -> m [SnapshotName]) -> m [SnapshotName]
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m [SnapshotName]) -> m [SnapshotName])
-> (SessionEnv m h -> m [SnapshotName]) -> m [SnapshotName]
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
      let hfs :: HasFS m h
hfs = SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv
          root :: SessionRoot
root = SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv
      Set String
contents <- HasFS m h -> HasCallStack => FsPath -> m (Set String)
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m (Set String)
FS.listDirectory HasFS m h
hfs (SessionRoot -> FsPath
Paths.snapshotsDir (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv))
      [Maybe SnapshotName]
snaps <- (String -> m (Maybe SnapshotName))
-> [String] -> m [Maybe SnapshotName]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (HasFS m h -> SessionRoot -> String -> m (Maybe SnapshotName)
forall {m :: * -> *} {h}.
Monad m =>
HasFS m h -> SessionRoot -> String -> m (Maybe SnapshotName)
checkSnapshot HasFS m h
hfs SessionRoot
root) ([String] -> m [Maybe SnapshotName])
-> [String] -> m [Maybe SnapshotName]
forall a b. (a -> b) -> a -> b
$ Set String -> [String]
forall a. Set a -> [a]
Set.toList Set String
contents
      [SnapshotName] -> m [SnapshotName]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([SnapshotName] -> m [SnapshotName])
-> [SnapshotName] -> m [SnapshotName]
forall a b. (a -> b) -> a -> b
$ [Maybe SnapshotName] -> [SnapshotName]
forall a. [Maybe a] -> [a]
catMaybes [Maybe SnapshotName]
snaps
  where
    checkSnapshot :: HasFS m h -> SessionRoot -> String -> m (Maybe SnapshotName)
checkSnapshot HasFS m h
hfs SessionRoot
root String
s = do
      -- TODO: rethrow 'ErrSnapshotNameInvalid' as 'ErrSnapshotDirCorrupted'
      let snap :: SnapshotName
snap = String -> SnapshotName
Paths.toSnapshotName String
s
      -- check that it is a directory
      Bool
b <- HasFS m h -> HasCallStack => FsPath -> m Bool
forall (m :: * -> *) h.
HasFS m h -> HasCallStack => FsPath -> m Bool
FS.doesDirectoryExist HasFS m h
hfs
            (NamedSnapshotDir -> FsPath
Paths.getNamedSnapshotDir (NamedSnapshotDir -> FsPath) -> NamedSnapshotDir -> FsPath
forall a b. (a -> b) -> a -> b
$ SessionRoot -> SnapshotName -> NamedSnapshotDir
Paths.namedSnapshotDir SessionRoot
root SnapshotName
snap)
      if Bool
b then Maybe SnapshotName -> m (Maybe SnapshotName)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe SnapshotName -> m (Maybe SnapshotName))
-> Maybe SnapshotName -> m (Maybe SnapshotName)
forall a b. (a -> b) -> a -> b
$ SnapshotName -> Maybe SnapshotName
forall a. a -> Maybe a
Just SnapshotName
snap
            else Maybe SnapshotName -> m (Maybe SnapshotName)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe SnapshotName -> m (Maybe SnapshotName))
-> Maybe SnapshotName -> m (Maybe SnapshotName)
forall a b. (a -> b) -> a -> b
$ Maybe SnapshotName
forall a. Maybe a
Nothing

{-------------------------------------------------------------------------------
  Multiple writable tables
-------------------------------------------------------------------------------}

{-# SPECIALISE duplicate :: Table IO h -> IO (Table IO h) #-}
-- | See 'Database.LSMTree.Normal.duplicate'.
duplicate ::
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => Table m h
  -> m (Table m h)
duplicate :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
Table m h -> m (Table m h)
duplicate t :: Table m h
t@Table{Tracer m TableTrace
RWVar m (TableState m h)
ArenaManager (PrimState m)
TableId
TableConfig
Session m h
tableConfig :: forall (m :: * -> *) h. Table m h -> TableConfig
tableState :: forall (m :: * -> *) h. Table m h -> RWVar m (TableState m h)
tableArenaManager :: forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableTracer :: forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableId :: forall (m :: * -> *) h. Table m h -> TableId
tableSession :: forall (m :: * -> *) h. Table m h -> Session m h
tableConfig :: TableConfig
tableState :: RWVar m (TableState m h)
tableArenaManager :: ArenaManager (PrimState m)
tableTracer :: Tracer m TableTrace
tableId :: TableId
tableSession :: Session m h
..} = do
    Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith Tracer m TableTrace
tableTracer TableTrace
TraceDuplicate
    Table m h -> (TableEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m (Table m h)) -> m (Table m h))
-> (TableEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \TableEnv{RWVar m (TableContent m h)
SessionEnv m h
tableSessionEnv :: forall (m :: * -> *) h. TableEnv m h -> SessionEnv m h
tableContent :: forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableSessionEnv :: SessionEnv m h
tableContent :: RWVar m (TableContent m h)
..} -> do
      -- We acquire a read-lock on the session open-state to prevent races, see
      -- 'sessionOpenTables'.
      Session m h -> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
tableSession ((SessionEnv m h -> m (Table m h)) -> m (Table m h))
-> (SessionEnv m h -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
_ -> do
        (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall (m :: * -> *) a.
(PrimMonad m, MonadCatch m) =>
(ActionRegistry m -> m a) -> m a
withActionRegistry ((ActionRegistry m -> m (Table m h)) -> m (Table m h))
-> (ActionRegistry m -> m (Table m h)) -> m (Table m h)
forall a b. (a -> b) -> a -> b
$ \ActionRegistry m
reg -> do
          -- The table contents escape the read access, but we just added references
          -- to each run so it is safe.
          TableContent m h
content <- RWVar m (TableContent m h)
-> (TableContent m h -> m (TableContent m h))
-> m (TableContent m h)
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess RWVar m (TableContent m h)
tableContent (ActionRegistry m -> TableContent m h -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m) =>
ActionRegistry m -> TableContent m h -> m (TableContent m h)
duplicateTableContent ActionRegistry m
reg)
          ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith
            ActionRegistry m
reg
            Session m h
tableSession
            SessionEnv m h
tableSessionEnv
            TableConfig
tableConfig
            ArenaManager (PrimState m)
tableArenaManager
            TableContent m h
content

{-------------------------------------------------------------------------------
   Table union
-------------------------------------------------------------------------------}

-- | A table union was constructed with two tables that are not compatible.
data TableUnionNotCompatibleError
    = ErrTableUnionHandleTypeMismatch
        -- | The index of the first table.
        !Int
        -- | The type of the filesystem handle of the first table.
        !TypeRep
        -- | The index of the second table.
        !Int
        -- | The type of the filesystem handle of the second table.
        !TypeRep
    | ErrTableUnionSessionMismatch
        -- | The index of the first table.
        !Int
        -- | The session directory of the first table.
        !FsErrorPath
        -- | The index of the second table.
        !Int
        -- | The session directory of the second table.
        !FsErrorPath
    deriving stock (Int -> TableUnionNotCompatibleError -> ShowS
[TableUnionNotCompatibleError] -> ShowS
TableUnionNotCompatibleError -> String
(Int -> TableUnionNotCompatibleError -> ShowS)
-> (TableUnionNotCompatibleError -> String)
-> ([TableUnionNotCompatibleError] -> ShowS)
-> Show TableUnionNotCompatibleError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TableUnionNotCompatibleError -> ShowS
showsPrec :: Int -> TableUnionNotCompatibleError -> ShowS
$cshow :: TableUnionNotCompatibleError -> String
show :: TableUnionNotCompatibleError -> String
$cshowList :: [TableUnionNotCompatibleError] -> ShowS
showList :: [TableUnionNotCompatibleError] -> ShowS
Show, TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
(TableUnionNotCompatibleError
 -> TableUnionNotCompatibleError -> Bool)
-> (TableUnionNotCompatibleError
    -> TableUnionNotCompatibleError -> Bool)
-> Eq TableUnionNotCompatibleError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
== :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
$c/= :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
/= :: TableUnionNotCompatibleError
-> TableUnionNotCompatibleError -> Bool
Eq)
    deriving anyclass (Show TableUnionNotCompatibleError
Typeable TableUnionNotCompatibleError
(Typeable TableUnionNotCompatibleError,
 Show TableUnionNotCompatibleError) =>
(TableUnionNotCompatibleError -> SomeException)
-> (SomeException -> Maybe TableUnionNotCompatibleError)
-> (TableUnionNotCompatibleError -> String)
-> Exception TableUnionNotCompatibleError
SomeException -> Maybe TableUnionNotCompatibleError
TableUnionNotCompatibleError -> String
TableUnionNotCompatibleError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: TableUnionNotCompatibleError -> SomeException
toException :: TableUnionNotCompatibleError -> SomeException
$cfromException :: SomeException -> Maybe TableUnionNotCompatibleError
fromException :: SomeException -> Maybe TableUnionNotCompatibleError
$cdisplayException :: TableUnionNotCompatibleError -> String
displayException :: TableUnionNotCompatibleError -> String
Exception)

{-# SPECIALISE unions :: NonEmpty (Table IO h) -> IO (Table IO h) #-}
-- | See 'Database.LSMTree.Normal.unions'.
unions ::
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => NonEmpty (Table m h)
  -> m (Table m h)
unions :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
NonEmpty (Table m h) -> m (Table m h)
unions NonEmpty (Table m h)
ts = do
    Session m h
sesh <- NonEmpty (Table m h) -> m (Session m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m) =>
NonEmpty (Table m h) -> m (Session m h)
ensureSessionsMatch NonEmpty (Table m h)
ts

    Tracer m LSMTreeTrace -> LSMTreeTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Session m h -> Tracer m LSMTreeTrace
forall (m :: * -> *) h. Session m h -> Tracer m LSMTreeTrace
sessionTracer Session m h
sesh) (LSMTreeTrace -> m ()) -> LSMTreeTrace -> m ()
forall a b. (a -> b) -> a -> b
$ NonEmpty TableId -> LSMTreeTrace
TraceUnions ((Table m h -> TableId) -> NonEmpty (Table m h) -> NonEmpty TableId
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
NE.map Table m h -> TableId
forall (m :: * -> *) h. Table m h -> TableId
tableId NonEmpty (Table m h)
ts)

    -- The TableConfig for the new table is taken from the last table in the
    -- union. This corresponds to the "Data.Map.union updates baseMap" order,
    -- where we take the config from the base map, not the updates.
    --
    -- This could be modified to take the new config as an input. It works to
    -- pick any config because the new table is almost completely fresh. It
    -- will have an empty write buffer and no runs in the normal levels. All
    -- the existing runs get squashed down into a single run before rejoining
    -- as a last level.
    let conf :: TableConfig
conf = Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig (NonEmpty (Table m h) -> Table m h
forall a. NonEmpty a -> a
NE.last NonEmpty (Table m h)
ts)

    -- We acquire a read-lock on the session open-state to prevent races, see
    -- 'sessionOpenTables'.
    m (SessionState m h)
-> (SessionState m h -> m ())
-> (ActionRegistry m
    -> SessionState m h -> m (SessionState m h, Table m h))
-> m (Table m h)
forall (m :: * -> *) st a.
(PrimMonad m, MonadCatch m) =>
m st
-> (st -> m ()) -> (ActionRegistry m -> st -> m (st, a)) -> m a
modifyWithActionRegistry
      (STM m (SessionState m h) -> m (SessionState m h)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (SessionState m h) -> m (SessionState m h))
-> STM m (SessionState m h) -> m (SessionState m h)
forall a b. (a -> b) -> a -> b
$ RWVar m (SessionState m h) -> STM m (SessionState m h)
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> STM m a
RW.unsafeAcquireReadAccess (Session m h -> RWVar m (SessionState m h)
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState Session m h
sesh))
      (\SessionState m h
_ -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ RWVar m (SessionState m h) -> STM m ()
forall (m :: * -> *) a. MonadSTM m => RWVar m a -> STM m ()
RW.unsafeReleaseReadAccess (Session m h -> RWVar m (SessionState m h)
forall (m :: * -> *) h. Session m h -> RWVar m (SessionState m h)
sessionState Session m h
sesh)) ((ActionRegistry m
  -> SessionState m h -> m (SessionState m h, Table m h))
 -> m (Table m h))
-> (ActionRegistry m
    -> SessionState m h -> m (SessionState m h, Table m h))
-> m (Table m h)
forall a b. (a -> b) -> a -> b
$
      \ActionRegistry m
reg -> \case
        SessionState m h
SessionClosed -> SessionClosedError -> m (SessionState m h, Table m h)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SessionClosedError
ErrSessionClosed
        seshState :: SessionState m h
seshState@(SessionOpen SessionEnv m h
seshEnv) -> do
          Table m h
t <- ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> NonEmpty (Table m h)
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, MonadMVar m, MonadST m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> NonEmpty (Table m h)
-> m (Table m h)
unionsInOpenSession ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf NonEmpty (Table m h)
ts
          (SessionState m h, Table m h) -> m (SessionState m h, Table m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SessionState m h
seshState, Table m h
t)

{-# SPECIALISE unionsInOpenSession ::
     ActionRegistry IO
  -> Session IO h
  -> SessionEnv IO h
  -> TableConfig
  -> NonEmpty (Table IO h)
  -> IO (Table IO h) #-}
unionsInOpenSession ::
     (MonadSTM m, MonadMask m, MonadMVar m, MonadST m)
  => ActionRegistry m
  -> Session m h
  -> SessionEnv m h
  -> TableConfig
  -> NonEmpty (Table m h)
  -> m (Table m h)
unionsInOpenSession :: forall (m :: * -> *) h.
(MonadSTM m, MonadMask m, MonadMVar m, MonadST m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> NonEmpty (Table m h)
-> m (Table m h)
unionsInOpenSession ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf NonEmpty (Table m h)
ts = do

    [Ref (MergingTree m h)]
mts <- [Table m h]
-> (Table m h -> m (Ref (MergingTree m h)))
-> m [Ref (MergingTree m h)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (NonEmpty (Table m h) -> [Table m h]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (Table m h)
ts) ((Table m h -> m (Ref (MergingTree m h)))
 -> m [Ref (MergingTree m h)])
-> (Table m h -> m (Ref (MergingTree m h)))
-> m [Ref (MergingTree m h)]
forall a b. (a -> b) -> a -> b
$ \Table m h
t ->
      Table m h
-> (TableEnv m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m (Ref (MergingTree m h)))
 -> m (Ref (MergingTree m h)))
-> (TableEnv m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv ->
        RWVar m (TableContent m h)
-> (TableContent m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m (Ref (MergingTree m h)))
 -> m (Ref (MergingTree m h)))
-> (TableContent m h -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tc ->
          -- tableContentToMergingTree duplicates all runs and merges
          -- so the ones from the tableContent here do not escape
          -- the read access.
          ActionRegistry m
-> m (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> m ())
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg
            (SessionEnv m h
-> TableConfig -> TableContent m h -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
SessionEnv m h
-> TableConfig -> TableContent m h -> m (Ref (MergingTree m h))
tableContentToMergingTree SessionEnv m h
seshEnv TableConfig
conf TableContent m h
tc)
            Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef
    Ref (MergingTree m h)
mt <- ActionRegistry m
-> m (Ref (MergingTree m h))
-> (Ref (MergingTree m h) -> m ())
-> m (Ref (MergingTree m h))
forall (m :: * -> *) a.
(PrimMonad m, MonadMask m, HasCallStack) =>
ActionRegistry m -> m a -> (a -> m ()) -> m a
withRollback ActionRegistry m
reg ([Ref (MergingTree m h)] -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, PrimMonad m) =>
[Ref (MergingTree m h)] -> m (Ref (MergingTree m h))
newPendingUnionMerge [Ref (MergingTree m h)]
mts) Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef

    -- The mts here is a temporary value, since newPendingUnionMerge
    -- will make its own references, so release mts at the end of
    -- the action registry bracket
    [Ref (MergingTree m h)] -> (Ref (MergingTree m h) -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Ref (MergingTree m h)]
mts (ActionRegistry m -> m () -> m ()
forall (m :: * -> *).
(PrimMonad m, HasCallStack) =>
ActionRegistry m -> m () -> m ()
delayedCommit ActionRegistry m
reg (m () -> m ())
-> (Ref (MergingTree m h) -> m ()) -> Ref (MergingTree m h) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref (MergingTree m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef)

    TableContent m h
empty <- SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
forall (m :: * -> *) h.
(PrimMonad m, MonadMask m, MonadMVar m) =>
SessionEnv m h -> ActionRegistry m -> m (TableContent m h)
newEmptyTableContent SessionEnv m h
seshEnv ActionRegistry m
reg
    let content :: TableContent m h
content = TableContent m h
empty { tableUnionLevel = Union mt }

        -- Pick the arena manager to optimise the case of:
        -- someUpdates <> bigTableWithLotsOfLookups
        -- by reusing the arena manager from the last one.
        am :: ArenaManager (PrimState m)
am = Table m h -> ArenaManager (PrimState m)
forall (m :: * -> *) h. Table m h -> ArenaManager (PrimState m)
tableArenaManager (NonEmpty (Table m h) -> Table m h
forall a. NonEmpty a -> a
NE.last NonEmpty (Table m h)
ts)

    ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, PrimMonad m) =>
ActionRegistry m
-> Session m h
-> SessionEnv m h
-> TableConfig
-> ArenaManager (PrimState m)
-> TableContent m h
-> m (Table m h)
newWith ActionRegistry m
reg Session m h
sesh SessionEnv m h
seshEnv TableConfig
conf ArenaManager (PrimState m)
am TableContent m h
content

{-# SPECIALISE tableContentToMergingTree ::
     SessionEnv IO h
  -> TableConfig
  -> TableContent IO h
  -> IO (Ref (MergingTree IO h)) #-}
tableContentToMergingTree ::
     forall m h.
     (MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
  => SessionEnv m h
  -> TableConfig
  -> TableContent m h
  -> m (Ref (MergingTree m h))
tableContentToMergingTree :: forall (m :: * -> *) h.
(MonadMask m, MonadMVar m, MonadST m, MonadSTM m) =>
SessionEnv m h
-> TableConfig -> TableContent m h -> m (Ref (MergingTree m h))
tableContentToMergingTree SessionEnv m h
seshEnv TableConfig
conf
                          tc :: TableContent m h
tc@TableContent {
                            Levels m h
tableLevels :: forall (m :: * -> *) h. TableContent m h -> Levels m h
tableLevels :: Levels m h
tableLevels,
                            UnionLevel m h
tableUnionLevel :: forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel :: UnionLevel m h
tableUnionLevel
                          } =
    m (Maybe (Ref (Run m h)))
-> (Maybe (Ref (Run m h)) -> m ())
-> (Maybe (Ref (Run m h)) -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (SessionEnv m h
-> TableConfig -> TableContent m h -> m (Maybe (Ref (Run m h)))
forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
SessionEnv m h
-> TableConfig -> TableContent m h -> m (Maybe (Ref (Run m h)))
writeBufferToNewRun SessionEnv m h
seshEnv TableConfig
conf TableContent m h
tc)
            ((Ref (Run m h) -> m ()) -> Maybe (Ref (Run m h)) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Ref (Run m h) -> m ()
forall (m :: * -> *) obj.
(RefCounted m obj, PrimMonad m, MonadMask m, HasCallStack) =>
Ref obj -> m ()
releaseRef) ((Maybe (Ref (Run m h)) -> m (Ref (MergingTree m h)))
 -> m (Ref (MergingTree m h)))
-> (Maybe (Ref (Run m h)) -> m (Ref (MergingTree m h)))
-> m (Ref (MergingTree m h))
forall a b. (a -> b) -> a -> b
$ \Maybe (Ref (Run m h))
mwbr ->
      let runs :: [PreExistingRun m h]
          runs :: [PreExistingRun m h]
runs = Maybe (PreExistingRun m h) -> [PreExistingRun m h]
forall a. Maybe a -> [a]
maybeToList ((Ref (Run m h) -> PreExistingRun m h)
-> Maybe (Ref (Run m h)) -> Maybe (PreExistingRun m h)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun Maybe (Ref (Run m h))
mwbr)
              [PreExistingRun m h]
-> [PreExistingRun m h] -> [PreExistingRun m h]
forall a. [a] -> [a] -> [a]
++ (Level m h -> [PreExistingRun m h])
-> [Level m h] -> [PreExistingRun m h]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap Level m h -> [PreExistingRun m h]
levelToPreExistingRuns (Levels m h -> [Level m h]
forall a. Vector a -> [a]
V.toList Levels m h
tableLevels)
          -- any pre-existing union in the input table:
          unionmt :: Maybe (Ref (MergingTree m h))
unionmt = case UnionLevel m h
tableUnionLevel of
                    UnionLevel m h
NoUnion  -> Maybe (Ref (MergingTree m h))
forall a. Maybe a
Nothing
                    Union Ref (MergingTree m h)
mt -> Ref (MergingTree m h) -> Maybe (Ref (MergingTree m h))
forall a. a -> Maybe a
Just Ref (MergingTree m h)
mt
       in [PreExistingRun m h]
-> Maybe (Ref (MergingTree m h)) -> m (Ref (MergingTree m h))
forall (m :: * -> *) h.
(MonadMVar m, MonadMask m, PrimMonad m) =>
[PreExistingRun m h]
-> Maybe (Ref (MergingTree m h)) -> m (Ref (MergingTree m h))
newPendingLevelMerge [PreExistingRun m h]
runs Maybe (Ref (MergingTree m h))
unionmt
  where
    levelToPreExistingRuns :: Level m h -> [PreExistingRun m h]
    levelToPreExistingRuns :: Level m h -> [PreExistingRun m h]
levelToPreExistingRuns Level{IncomingRun m h
incomingRun :: IncomingRun m h
incomingRun :: forall (m :: * -> *) h. Level m h -> IncomingRun m h
incomingRun, Vector (Ref (Run m h))
residentRuns :: Vector (Ref (Run m h))
residentRuns :: forall (m :: * -> *) h. Level m h -> Vector (Ref (Run m h))
residentRuns} =
        case IncomingRun m h
incomingRun of
          Single        Ref (Run m h)
r  -> Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun Ref (Run m h)
r
          Merging MergePolicyForLevel
_ NominalDebt
_ PrimVar (PrimState m) NominalCredits
_ Ref (MergingRun LevelMergeType m h)
mr -> Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h
forall (m :: * -> *) h.
Ref (MergingRun LevelMergeType m h) -> PreExistingRun m h
PreExistingMergingRun Ref (MergingRun LevelMergeType m h)
mr
      PreExistingRun m h -> [PreExistingRun m h] -> [PreExistingRun m h]
forall a. a -> [a] -> [a]
: (Ref (Run m h) -> PreExistingRun m h)
-> [Ref (Run m h)] -> [PreExistingRun m h]
forall a b. (a -> b) -> [a] -> [b]
map Ref (Run m h) -> PreExistingRun m h
forall (m :: * -> *) h. Ref (Run m h) -> PreExistingRun m h
PreExistingRun (Vector (Ref (Run m h)) -> [Ref (Run m h)]
forall a. Vector a -> [a]
V.toList Vector (Ref (Run m h))
residentRuns)

--TODO: can we share this or move it to MergeSchedule?
{-# SPECIALISE writeBufferToNewRun ::
     SessionEnv IO h
  -> TableConfig
  -> TableContent IO h
  -> IO (Maybe (Ref (Run IO h))) #-}
writeBufferToNewRun ::
     (MonadMask m, MonadST m, MonadSTM m)
  => SessionEnv m h
  -> TableConfig
  -> TableContent m h
  -> m (Maybe (Ref (Run m h)))
writeBufferToNewRun :: forall (m :: * -> *) h.
(MonadMask m, MonadST m, MonadSTM m) =>
SessionEnv m h
-> TableConfig -> TableContent m h -> m (Maybe (Ref (Run m h)))
writeBufferToNewRun SessionEnv {
                      sessionRoot :: forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot        = SessionRoot
root,
                      sessionHasFS :: forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS       = HasFS m h
hfs,
                      sessionHasBlockIO :: forall (m :: * -> *) h. SessionEnv m h -> HasBlockIO m h
sessionHasBlockIO  = HasBlockIO m h
hbio,
                      sessionUniqCounter :: forall (m :: * -> *) h. SessionEnv m h -> UniqCounter m
sessionUniqCounter = UniqCounter m
uc
                    }
                    TableConfig
conf
                    TableContent{
                      WriteBuffer
tableWriteBuffer :: forall (m :: * -> *) h. TableContent m h -> WriteBuffer
tableWriteBuffer :: WriteBuffer
tableWriteBuffer,
                      Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: forall (m :: * -> *) h.
TableContent m h -> Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs :: Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs
                    }
  | WriteBuffer -> Bool
WB.null WriteBuffer
tableWriteBuffer = Maybe (Ref (Run m h)) -> m (Maybe (Ref (Run m h)))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Ref (Run m h))
forall a. Maybe a
Nothing
  | Bool
otherwise                = Ref (Run m h) -> Maybe (Ref (Run m h))
forall a. a -> Maybe a
Just (Ref (Run m h) -> Maybe (Ref (Run m h)))
-> m (Ref (Run m h)) -> m (Maybe (Ref (Run m h)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> do
    !Unique
uniq <- UniqCounter m -> m Unique
forall (m :: * -> *). PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter UniqCounter m
uc
    let (!RunParams
runParams, !RunFsPaths
runPaths) = ActiveDir
-> TableConfig -> Unique -> LevelNo -> (RunParams, RunFsPaths)
mergingRunParamsForLevel
                                   (SessionRoot -> ActiveDir
Paths.activeDir SessionRoot
root) TableConfig
conf Unique
uniq (Int -> LevelNo
LevelNo Int
1)
    HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> RunParams
-> RunFsPaths
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
Run.fromWriteBuffer
      HasFS m h
hfs HasBlockIO m h
hbio
      RunParams
runParams RunFsPaths
runPaths
      WriteBuffer
tableWriteBuffer
      Ref (WriteBufferBlobs m h)
tableWriteBufferBlobs

{-# SPECIALISE ensureSessionsMatch ::
     NonEmpty (Table IO h)
  -> IO (Session IO h) #-}
-- | Check if all tables have the same session.
--   If so, return the session.
--   Otherwise, throw a 'TableUnionNotCompatibleError'.
ensureSessionsMatch ::
     (MonadSTM m, MonadThrow m)
  => NonEmpty (Table m h)
  -> m (Session m h)
ensureSessionsMatch :: forall (m :: * -> *) h.
(MonadSTM m, MonadThrow m) =>
NonEmpty (Table m h) -> m (Session m h)
ensureSessionsMatch (Table m h
t :| [Table m h]
ts) = do
  let sesh :: Session m h
sesh = Table m h -> Session m h
forall (m :: * -> *) h. Table m h -> Session m h
tableSession Table m h
t
  Session m h
-> (SessionEnv m h -> m (Session m h)) -> m (Session m h)
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh ((SessionEnv m h -> m (Session m h)) -> m (Session m h))
-> (SessionEnv m h -> m (Session m h)) -> m (Session m h)
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv -> do
    let root :: FsErrorPath
root = HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv) (SessionRoot -> FsPath
getSessionRoot (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv))
    -- Check that the session roots for all tables are the same. There can only
    -- be one *open/active* session per directory because of cooperative file
    -- locks, so each unique *open* session has a unique session root. We check
    -- that all the table's sessions are open at the same time while comparing
    -- the session roots.
    [(Int, Table m h)] -> ((Int, Table m h) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ ([Int] -> [Table m h] -> [(Int, Table m h)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] [Table m h]
ts) (((Int, Table m h) -> m ()) -> m ())
-> ((Int, Table m h) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(Int
i, Table m h
t') -> do
      let sesh' :: Session m h
sesh' = Table m h -> Session m h
forall (m :: * -> *) h. Table m h -> Session m h
tableSession Table m h
t'
      Session m h -> (SessionEnv m h -> m ()) -> m ()
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Session m h -> (SessionEnv m h -> m a) -> m a
withOpenSession Session m h
sesh' ((SessionEnv m h -> m ()) -> m ())
-> (SessionEnv m h -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \SessionEnv m h
seshEnv' -> do
        let root' :: FsErrorPath
root' = HasFS m h -> FsPath -> FsErrorPath
forall (m :: * -> *) h. HasFS m h -> FsPath -> FsErrorPath
FS.mkFsErrorPath (SessionEnv m h -> HasFS m h
forall (m :: * -> *) h. SessionEnv m h -> HasFS m h
sessionHasFS SessionEnv m h
seshEnv') (SessionRoot -> FsPath
getSessionRoot (SessionEnv m h -> SessionRoot
forall (m :: * -> *) h. SessionEnv m h -> SessionRoot
sessionRoot SessionEnv m h
seshEnv'))
        -- TODO: compare LockFileHandle instead of SessionRoot (?).
        -- We can write an Eq instance for LockFileHandle based on pointer equality,
        -- just like base does for Handle.
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (FsErrorPath
root FsErrorPath -> FsErrorPath -> Bool
forall a. Eq a => a -> a -> Bool
== FsErrorPath
root') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ TableUnionNotCompatibleError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (TableUnionNotCompatibleError -> m ())
-> TableUnionNotCompatibleError -> m ()
forall a b. (a -> b) -> a -> b
$ Int
-> FsErrorPath
-> Int
-> FsErrorPath
-> TableUnionNotCompatibleError
ErrTableUnionSessionMismatch Int
0 FsErrorPath
root Int
i FsErrorPath
root'
    Session m h -> m (Session m h)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Session m h
sesh

{-------------------------------------------------------------------------------
  Table union: debt and credit
-------------------------------------------------------------------------------}

-- | See 'Database.LSMTree.Normal.UnionDebt'.
newtype UnionDebt = UnionDebt Int
  deriving newtype (Int -> UnionDebt -> ShowS
[UnionDebt] -> ShowS
UnionDebt -> String
(Int -> UnionDebt -> ShowS)
-> (UnionDebt -> String)
-> ([UnionDebt] -> ShowS)
-> Show UnionDebt
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnionDebt -> ShowS
showsPrec :: Int -> UnionDebt -> ShowS
$cshow :: UnionDebt -> String
show :: UnionDebt -> String
$cshowList :: [UnionDebt] -> ShowS
showList :: [UnionDebt] -> ShowS
Show, UnionDebt -> UnionDebt -> Bool
(UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool) -> Eq UnionDebt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnionDebt -> UnionDebt -> Bool
== :: UnionDebt -> UnionDebt -> Bool
$c/= :: UnionDebt -> UnionDebt -> Bool
/= :: UnionDebt -> UnionDebt -> Bool
Eq, Eq UnionDebt
Eq UnionDebt =>
(UnionDebt -> UnionDebt -> Ordering)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> Bool)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> Ord UnionDebt
UnionDebt -> UnionDebt -> Bool
UnionDebt -> UnionDebt -> Ordering
UnionDebt -> UnionDebt -> UnionDebt
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: UnionDebt -> UnionDebt -> Ordering
compare :: UnionDebt -> UnionDebt -> Ordering
$c< :: UnionDebt -> UnionDebt -> Bool
< :: UnionDebt -> UnionDebt -> Bool
$c<= :: UnionDebt -> UnionDebt -> Bool
<= :: UnionDebt -> UnionDebt -> Bool
$c> :: UnionDebt -> UnionDebt -> Bool
> :: UnionDebt -> UnionDebt -> Bool
$c>= :: UnionDebt -> UnionDebt -> Bool
>= :: UnionDebt -> UnionDebt -> Bool
$cmax :: UnionDebt -> UnionDebt -> UnionDebt
max :: UnionDebt -> UnionDebt -> UnionDebt
$cmin :: UnionDebt -> UnionDebt -> UnionDebt
min :: UnionDebt -> UnionDebt -> UnionDebt
Ord, Integer -> UnionDebt
UnionDebt -> UnionDebt
UnionDebt -> UnionDebt -> UnionDebt
(UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (UnionDebt -> UnionDebt)
-> (Integer -> UnionDebt)
-> Num UnionDebt
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: UnionDebt -> UnionDebt -> UnionDebt
+ :: UnionDebt -> UnionDebt -> UnionDebt
$c- :: UnionDebt -> UnionDebt -> UnionDebt
- :: UnionDebt -> UnionDebt -> UnionDebt
$c* :: UnionDebt -> UnionDebt -> UnionDebt
* :: UnionDebt -> UnionDebt -> UnionDebt
$cnegate :: UnionDebt -> UnionDebt
negate :: UnionDebt -> UnionDebt
$cabs :: UnionDebt -> UnionDebt
abs :: UnionDebt -> UnionDebt
$csignum :: UnionDebt -> UnionDebt
signum :: UnionDebt -> UnionDebt
$cfromInteger :: Integer -> UnionDebt
fromInteger :: Integer -> UnionDebt
Num)

{-# SPECIALISE remainingUnionDebt :: Table IO h -> IO UnionDebt #-}
-- | See 'Database.LSMTree.Normal.remainingUnionDebt'.
remainingUnionDebt ::
     (MonadSTM m, MonadMVar m, MonadThrow m, PrimMonad m)
  => Table m h -> m UnionDebt
remainingUnionDebt :: forall (m :: * -> *) h.
(MonadSTM m, MonadMVar m, MonadThrow m, PrimMonad m) =>
Table m h -> m UnionDebt
remainingUnionDebt Table m h
t = do
    Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) TableTrace
TraceRemainingUnionDebt
    Table m h -> (TableEnv m h -> m UnionDebt) -> m UnionDebt
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m UnionDebt) -> m UnionDebt)
-> (TableEnv m h -> m UnionDebt) -> m UnionDebt
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
      RWVar m (TableContent m h)
-> (TableContent m h -> m UnionDebt) -> m UnionDebt
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m UnionDebt) -> m UnionDebt)
-> (TableContent m h -> m UnionDebt) -> m UnionDebt
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tableContent -> do
        case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tableContent of
          UnionLevel m h
NoUnion ->
            UnionDebt -> m UnionDebt
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> UnionDebt
UnionDebt Int
0)
          Union Ref (MergingTree m h)
mt -> do
            (MergeDebt (MergeCredits Int
c), NumEntries
_) <- Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
forall (m :: * -> *) h.
(MonadMVar m, PrimMonad m) =>
Ref (MergingTree m h) -> m (MergeDebt, NumEntries)
MT.remainingMergeDebt Ref (MergingTree m h)
mt
            UnionDebt -> m UnionDebt
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> UnionDebt
UnionDebt Int
c)

-- | See 'Database.LSMTree.Normal.UnionCredits'.
newtype UnionCredits = UnionCredits Int
  deriving newtype (Int -> UnionCredits -> ShowS
[UnionCredits] -> ShowS
UnionCredits -> String
(Int -> UnionCredits -> ShowS)
-> (UnionCredits -> String)
-> ([UnionCredits] -> ShowS)
-> Show UnionCredits
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnionCredits -> ShowS
showsPrec :: Int -> UnionCredits -> ShowS
$cshow :: UnionCredits -> String
show :: UnionCredits -> String
$cshowList :: [UnionCredits] -> ShowS
showList :: [UnionCredits] -> ShowS
Show, UnionCredits -> UnionCredits -> Bool
(UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool) -> Eq UnionCredits
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnionCredits -> UnionCredits -> Bool
== :: UnionCredits -> UnionCredits -> Bool
$c/= :: UnionCredits -> UnionCredits -> Bool
/= :: UnionCredits -> UnionCredits -> Bool
Eq, Eq UnionCredits
Eq UnionCredits =>
(UnionCredits -> UnionCredits -> Ordering)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> Bool)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> Ord UnionCredits
UnionCredits -> UnionCredits -> Bool
UnionCredits -> UnionCredits -> Ordering
UnionCredits -> UnionCredits -> UnionCredits
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: UnionCredits -> UnionCredits -> Ordering
compare :: UnionCredits -> UnionCredits -> Ordering
$c< :: UnionCredits -> UnionCredits -> Bool
< :: UnionCredits -> UnionCredits -> Bool
$c<= :: UnionCredits -> UnionCredits -> Bool
<= :: UnionCredits -> UnionCredits -> Bool
$c> :: UnionCredits -> UnionCredits -> Bool
> :: UnionCredits -> UnionCredits -> Bool
$c>= :: UnionCredits -> UnionCredits -> Bool
>= :: UnionCredits -> UnionCredits -> Bool
$cmax :: UnionCredits -> UnionCredits -> UnionCredits
max :: UnionCredits -> UnionCredits -> UnionCredits
$cmin :: UnionCredits -> UnionCredits -> UnionCredits
min :: UnionCredits -> UnionCredits -> UnionCredits
Ord, Integer -> UnionCredits
UnionCredits -> UnionCredits
UnionCredits -> UnionCredits -> UnionCredits
(UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (UnionCredits -> UnionCredits)
-> (Integer -> UnionCredits)
-> Num UnionCredits
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: UnionCredits -> UnionCredits -> UnionCredits
+ :: UnionCredits -> UnionCredits -> UnionCredits
$c- :: UnionCredits -> UnionCredits -> UnionCredits
- :: UnionCredits -> UnionCredits -> UnionCredits
$c* :: UnionCredits -> UnionCredits -> UnionCredits
* :: UnionCredits -> UnionCredits -> UnionCredits
$cnegate :: UnionCredits -> UnionCredits
negate :: UnionCredits -> UnionCredits
$cabs :: UnionCredits -> UnionCredits
abs :: UnionCredits -> UnionCredits
$csignum :: UnionCredits -> UnionCredits
signum :: UnionCredits -> UnionCredits
$cfromInteger :: Integer -> UnionCredits
fromInteger :: Integer -> UnionCredits
Num)

{-# SPECIALISE supplyUnionCredits ::
     ResolveSerialisedValue -> Table IO h -> UnionCredits -> IO UnionCredits #-}
-- | See 'Database.LSMTree.Normal.supplyUnionCredits'.
supplyUnionCredits ::
     (MonadST m, MonadSTM m, MonadMVar m, MonadMask m)
  => ResolveSerialisedValue -> Table m h -> UnionCredits -> m UnionCredits
supplyUnionCredits :: forall (m :: * -> *) h.
(MonadST m, MonadSTM m, MonadMVar m, MonadMask m) =>
ResolveSerialisedValue
-> Table m h -> UnionCredits -> m UnionCredits
supplyUnionCredits ResolveSerialisedValue
resolve Table m h
t UnionCredits
credits = do
    Tracer m TableTrace -> TableTrace -> m ()
forall (m :: * -> *) a. Monad m => Tracer m a -> a -> m ()
traceWith (Table m h -> Tracer m TableTrace
forall (m :: * -> *) h. Table m h -> Tracer m TableTrace
tableTracer Table m h
t) (TableTrace -> m ()) -> TableTrace -> m ()
forall a b. (a -> b) -> a -> b
$ UnionCredits -> TableTrace
TraceSupplyUnionCredits UnionCredits
credits
    Table m h -> (TableEnv m h -> m UnionCredits) -> m UnionCredits
forall (m :: * -> *) h a.
(MonadSTM m, MonadThrow m) =>
Table m h -> (TableEnv m h -> m a) -> m a
withOpenTable Table m h
t ((TableEnv m h -> m UnionCredits) -> m UnionCredits)
-> (TableEnv m h -> m UnionCredits) -> m UnionCredits
forall a b. (a -> b) -> a -> b
$ \TableEnv m h
tEnv -> do
      -- No need to mutate the table content here. In the rare case that we want
      -- to move a completed union level into the regular levels, we can still
      -- take the write lock for that.
      RWVar m (TableContent m h)
-> (TableContent m h -> m UnionCredits) -> m UnionCredits
forall (m :: * -> *) a b.
(MonadSTM m, MonadThrow m) =>
RWVar m a -> (a -> m b) -> m b
RW.withReadAccess (TableEnv m h -> RWVar m (TableContent m h)
forall (m :: * -> *) h. TableEnv m h -> RWVar m (TableContent m h)
tableContent TableEnv m h
tEnv) ((TableContent m h -> m UnionCredits) -> m UnionCredits)
-> (TableContent m h -> m UnionCredits) -> m UnionCredits
forall a b. (a -> b) -> a -> b
$ \TableContent m h
tableContent -> do
        case TableContent m h -> UnionLevel m h
forall (m :: * -> *) h. TableContent m h -> UnionLevel m h
tableUnionLevel TableContent m h
tableContent of
          UnionLevel m h
NoUnion ->
            UnionCredits -> m UnionCredits
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UnionCredits -> UnionCredits -> UnionCredits
forall a. Ord a => a -> a -> a
max UnionCredits
0 UnionCredits
credits)  -- all leftovers (but never negative)
          Union Ref (MergingTree m h)
mt -> do
            let conf :: TableConfig
conf = Table m h -> TableConfig
forall (m :: * -> *) h. Table m h -> TableConfig
tableConfig Table m h
t
            let AllocNumEntries (NumEntries Int
x) = TableConfig -> WriteBufferAlloc
confWriteBufferAlloc TableConfig
conf
            -- We simply use the write buffer size as merge credit threshold, as
            -- the regular level merges also do.
            -- TODO: pick a more suitable threshold or make configurable?
            let thresh :: CreditThreshold
thresh = UnspentCredits -> CreditThreshold
MR.CreditThreshold (MergeCredits -> UnspentCredits
MR.UnspentCredits (Int -> MergeCredits
MergeCredits Int
x))
            MergeCredits Int
leftovers <-
              HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> CreditThreshold
-> SessionRoot
-> UniqCounter m
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
forall (m :: * -> *) h.
(MonadMVar m, MonadST m, MonadSTM m, MonadMask m) =>
HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> RunParams
-> CreditThreshold
-> SessionRoot
-> UniqCounter m
-> Ref (MergingTree m h)
-> MergeCredits
-> m MergeCredits
MT.supplyCredits
                (TableEnv m h -> HasFS m h
forall (m :: * -> *) h. TableEnv m h -> HasFS m h
tableHasFS TableEnv m h
tEnv)
                (TableEnv m h -> HasBlockIO m h
forall (m :: * -> *) h. TableEnv m h -> HasBlockIO m h
tableHasBlockIO TableEnv m h
tEnv)
                ResolveSerialisedValue
resolve
                (TableConfig -> RunLevelNo -> RunParams
runParamsForLevel TableConfig
conf RunLevelNo
UnionLevel)
                CreditThreshold
thresh
                (TableEnv m h -> SessionRoot
forall (m :: * -> *) h. TableEnv m h -> SessionRoot
tableSessionRoot TableEnv m h
tEnv)
                (TableEnv m h -> UniqCounter m
forall (m :: * -> *) h. TableEnv m h -> UniqCounter m
tableSessionUniqCounter TableEnv m h
tEnv)
                Ref (MergingTree m h)
mt
                (let UnionCredits Int
c = UnionCredits
credits in Int -> MergeCredits
MergeCredits Int
c)
            UnionCredits -> m UnionCredits
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> UnionCredits
UnionCredits Int
leftovers)