{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE DerivingStrategies  #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE NumericUnderscores  #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Plutus.ChainIndex.Events where

import Cardano.BM.Trace (Trace)
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMQueue (flushTBMQueue, isFullTBMQueue)
import Control.Monad (forever, void)
import Data.Maybe (catMaybes, isJust)
import Numeric.Natural (Natural)
import Plutus.ChainIndex qualified as CI
import Plutus.ChainIndex.Lib (ChainSyncEvent (Resume, RollBackward, RollForward), EventsQueue, RunRequirements,
                              runChainIndexDuringSync)
import Plutus.ChainIndex.SyncStats (SyncLog, getSyncState, isSyncStateSynced, logProgress)
import Plutus.ChainIndex.Types (tipAsPoint)
import Plutus.Monitoring.Util (PrettyObject (PrettyObject), convertLog, runLogEffects)
import System.Clock (Clock (Monotonic), diffTimeSpec, getTime)

-- | How often do we check the queue
period :: Int
period :: Int
period = Int
2_000_000 -- 2s

-- | We estimate the size of the event with the number of the transactions in the block.
-- By doing this we accumulate some number of blocks but with less than 'queueSize' number of transactions.
-- This approach helps to process blocks with a constant memory usage.
--
-- However, once we are in sync with the node, we want to process every block
-- instead of batches of blocks so that we can update the database as frequently
-- as possible.
--
-- Just accumulating 'queueSize' blocks doesn't work as a block can have any number of transactions.
-- It works fine at the beginning of the chain but later blocks grow in their size and the memory
-- usage grows tremendously.
measureEventQueueSizeByTxs :: Natural -> ChainSyncEvent -> Natural
measureEventQueueSizeByTxs :: Natural -> ChainSyncEvent -> Natural
measureEventQueueSizeByTxs Natural
maxQueueSize (RollForward (CI.Block Tip
syncTip [(ChainIndexTx, TxProcessOption)]
transactions) Tip
nodeTip) =
    let syncState :: SyncState
syncState = Point -> Point -> SyncState
getSyncState (Tip -> Point
tipAsPoint Tip
syncTip) (Tip -> Point
tipAsPoint Tip
nodeTip)
        txLen :: Natural
txLen = Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Natural) -> Int -> Natural
forall a b. (a -> b) -> a -> b
$ [(ChainIndexTx, TxProcessOption)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(ChainIndexTx, TxProcessOption)]
transactions
     in if SyncState -> Bool
isSyncStateSynced SyncState
syncState
           then Natural
maxQueueSize Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1
           else Natural
txLen
measureEventQueueSizeByTxs Natural
maxQueueSize ChainSyncEvent
_ = Natural
maxQueueSize Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1 -- to handle resume and rollback asap

-- | 'processEventsQueue' reads events from 'TBQueue', collects enough 'RollForward's to
-- append blocks at once.
processEventsQueue :: Trace IO (PrettyObject SyncLog) -> RunRequirements -> EventsQueue -> IO ()
processEventsQueue :: Trace IO (PrettyObject SyncLog)
-> RunRequirements -> EventsQueue -> IO ()
processEventsQueue Trace IO (PrettyObject SyncLog)
trace RunRequirements
runReq EventsQueue
eventsQueue = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  TimeSpec
start <- Clock -> IO TimeSpec
getTime Clock
Monotonic
  [ChainSyncEvent]
eventsToProcess <- do
    let
      waitUntilEvents :: IO [ChainSyncEvent]
waitUntilEvents = do
        Bool
isFull <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ EventsQueue -> STM Bool
forall a. TBMQueue a -> STM Bool
isFullTBMQueue EventsQueue
eventsQueue
        if Bool
isFull then STM [ChainSyncEvent] -> IO [ChainSyncEvent]
forall a. STM a -> IO a
atomically (STM [ChainSyncEvent] -> IO [ChainSyncEvent])
-> STM [ChainSyncEvent] -> IO [ChainSyncEvent]
forall a b. (a -> b) -> a -> b
$ EventsQueue -> STM [ChainSyncEvent]
forall a. TBMQueue a -> STM [a]
flushTBMQueue EventsQueue
eventsQueue
        else Int -> IO ()
threadDelay Int
period IO () -> IO [ChainSyncEvent] -> IO [ChainSyncEvent]
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO [ChainSyncEvent]
waitUntilEvents
    IO [ChainSyncEvent]
waitUntilEvents
  [ChainSyncEvent] -> IO ()
processEvents [ChainSyncEvent]
eventsToProcess
  TimeSpec
end <- Clock -> IO TimeSpec
getTime Clock
Monotonic
  IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Trace IO SyncLog -> Eff '[LogMsg SyncLog, IO] ~> IO
forall (m :: * -> *) l.
MonadIO m =>
Trace m l -> Eff '[LogMsg l, m] ~> m
runLogEffects ((SyncLog -> PrettyObject SyncLog)
-> Trace IO (PrettyObject SyncLog) -> Trace IO SyncLog
forall a b (m :: * -> *). (a -> b) -> Trace m b -> Trace m a
convertLog SyncLog -> PrettyObject SyncLog
forall t. t -> PrettyObject t
PrettyObject Trace IO (PrettyObject SyncLog)
trace) (Eff '[LogMsg SyncLog, IO] () -> IO ())
-> Eff '[LogMsg SyncLog, IO] () -> IO ()
forall a b. (a -> b) -> a -> b
$ [ChainSyncEvent] -> TimeSpec -> Eff '[LogMsg SyncLog, IO] ()
forall (effs :: [* -> *]).
Member (LogMsg SyncLog) effs =>
[ChainSyncEvent] -> TimeSpec -> Eff effs ()
logProgress [ChainSyncEvent]
eventsToProcess (TimeSpec -> TimeSpec -> TimeSpec
diffTimeSpec TimeSpec
end TimeSpec
start)
  where
    processEvents :: [ChainSyncEvent] -> IO ()
    processEvents :: [ChainSyncEvent] -> IO ()
processEvents [] = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    processEvents events :: [ChainSyncEvent]
events@( ChainSyncEvent
e : [ChainSyncEvent]
restEvents ) = case ChainSyncEvent
e of
      (Resume Point
resumePoint) -> do
        IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ RunRequirements
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     ()
-> IO (Maybe ())
forall a.
RunRequirements
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     a
-> IO (Maybe a)
runChainIndexDuringSync RunRequirements
runReq (Eff
   '[ChainIndexQueryEffect, ChainIndexControlEffect,
     BeamEffect Sqlite]
   ()
 -> IO (Maybe ()))
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     ()
-> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ Point
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     ()
forall (effs :: [* -> *]).
Member ChainIndexControlEffect effs =>
Point -> Eff effs ()
CI.resumeSync Point
resumePoint
        [ChainSyncEvent] -> IO ()
processEvents [ChainSyncEvent]
restEvents

      (RollBackward Point
backwardPoint Tip
_) -> do
        IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ RunRequirements
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     ()
-> IO (Maybe ())
forall a.
RunRequirements
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     a
-> IO (Maybe a)
runChainIndexDuringSync RunRequirements
runReq (Eff
   '[ChainIndexQueryEffect, ChainIndexControlEffect,
     BeamEffect Sqlite]
   ()
 -> IO (Maybe ()))
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     ()
-> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ Point
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     ()
forall (effs :: [* -> *]).
Member ChainIndexControlEffect effs =>
Point -> Eff effs ()
CI.rollback Point
backwardPoint
        [ChainSyncEvent] -> IO ()
processEvents [ChainSyncEvent]
restEvents

      (RollForward ChainSyncBlock
_ Tip
_) -> do
        let getBlock :: ChainSyncEvent -> Maybe ChainSyncBlock
getBlock = \case
              (RollForward ChainSyncBlock
block Tip
_) -> ChainSyncBlock -> Maybe ChainSyncBlock
forall a. a -> Maybe a
Just ChainSyncBlock
block
              ChainSyncEvent
_                     -> Maybe ChainSyncBlock
forall a. Maybe a
Nothing
            isRollForwardEvt :: ChainSyncEvent -> Bool
isRollForwardEvt = Maybe ChainSyncBlock -> Bool
forall a. Maybe a -> Bool
isJust (Maybe ChainSyncBlock -> Bool)
-> (ChainSyncEvent -> Maybe ChainSyncBlock)
-> ChainSyncEvent
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChainSyncEvent -> Maybe ChainSyncBlock
getBlock
            ([ChainSyncEvent]
rollForwardEvents, [ChainSyncEvent]
restEvents') = (ChainSyncEvent -> Bool)
-> [ChainSyncEvent] -> ([ChainSyncEvent], [ChainSyncEvent])
forall a. (a -> Bool) -> [a] -> ([a], [a])
span ChainSyncEvent -> Bool
isRollForwardEvt [ChainSyncEvent]
events
            blocks :: [ChainSyncBlock]
blocks = [Maybe ChainSyncBlock] -> [ChainSyncBlock]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe ChainSyncBlock] -> [ChainSyncBlock])
-> [Maybe ChainSyncBlock] -> [ChainSyncBlock]
forall a b. (a -> b) -> a -> b
$ (ChainSyncEvent -> Maybe ChainSyncBlock)
-> [ChainSyncEvent] -> [Maybe ChainSyncBlock]
forall a b. (a -> b) -> [a] -> [b]
map ChainSyncEvent -> Maybe ChainSyncBlock
getBlock [ChainSyncEvent]
rollForwardEvents
        IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ RunRequirements
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     ()
-> IO (Maybe ())
forall a.
RunRequirements
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     a
-> IO (Maybe a)
runChainIndexDuringSync RunRequirements
runReq (Eff
   '[ChainIndexQueryEffect, ChainIndexControlEffect,
     BeamEffect Sqlite]
   ()
 -> IO (Maybe ()))
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     ()
-> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ [ChainSyncBlock]
-> Eff
     '[ChainIndexQueryEffect, ChainIndexControlEffect,
       BeamEffect Sqlite]
     ()
forall (effs :: [* -> *]).
Member ChainIndexControlEffect effs =>
[ChainSyncBlock] -> Eff effs ()
CI.appendBlocks [ChainSyncBlock]
blocks
        [ChainSyncEvent] -> IO ()
processEvents [ChainSyncEvent]
restEvents'