{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TypeFamilies #-}
module Cardano.Protocol.Socket.Mock.Client where
import Data.ByteString.Lazy qualified as LBS
import Data.Time.Units (Second, TimeUnit, toMicroseconds)
import Data.Void (Void)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Catch (catchAll)
import Control.Tracer
import Cardano.Api qualified as C
import Ouroboros.Network.Block (Point (..))
import Ouroboros.Network.Protocol.ChainSync.Client qualified as ChainSync
import Cardano.Node.Emulator.Internal.Node (SlotConfig, currentSlot)
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToClient (NodeToClientProtocols (..), connectTo, versionedNodeToClientProtocols)
import Ouroboros.Network.Snocket
import Ouroboros.Network.Socket
import Cardano.Node.Socket.Emulator.Types (Tip, chainSyncCodec, doNothingInitiatorProtocol, epochSlots,
fromCardanoBlock, nodeToClientVersion, nodeToClientVersionData)
import Cardano.Protocol.Socket.Client (ChainSyncHandle (..))
import Ledger (Block, Slot (..))
newtype TxSendHandle = TxSendHandle
{ TxSendHandle -> TQueue (Tx BabbageEra)
tshQueue :: TQueue (C.Tx C.BabbageEra) }
queueTx ::
TxSendHandle
-> C.Tx C.BabbageEra
-> IO ()
queueTx :: TxSendHandle -> Tx BabbageEra -> IO ()
queueTx TxSendHandle { TQueue (Tx BabbageEra)
tshQueue :: TQueue (Tx BabbageEra)
tshQueue :: TxSendHandle -> TQueue (Tx BabbageEra)
tshQueue } Tx BabbageEra
tx =
STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Tx BabbageEra) -> Tx BabbageEra -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Tx BabbageEra)
tshQueue Tx BabbageEra
tx)
getCurrentSlot :: ChainSyncHandle Block -> IO Slot
getCurrentSlot :: ChainSyncHandle Block -> IO Slot
getCurrentSlot = ChainSyncHandle Block -> IO Slot
forall event. ChainSyncHandle event -> IO Slot
cshCurrentSlot
runChainSync' :: FilePath
-> SlotConfig
-> IO (ChainSyncHandle Block)
runChainSync' :: FilePath -> SlotConfig -> IO (ChainSyncHandle Block)
runChainSync' FilePath
socketPath SlotConfig
slotConfig =
FilePath
-> SlotConfig
-> (Block -> Slot -> IO ())
-> IO (ChainSyncHandle Block)
runChainSync FilePath
socketPath SlotConfig
slotConfig (\Block
_ Slot
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
runChainSync :: FilePath
-> SlotConfig
-> (Block -> Slot -> IO ())
-> IO (ChainSyncHandle Block)
runChainSync :: FilePath
-> SlotConfig
-> (Block -> Slot -> IO ())
-> IO (ChainSyncHandle Block)
runChainSync FilePath
socketPath SlotConfig
slotConfig Block -> Slot -> IO ()
onNewBlock = do
let handle :: ChainSyncHandle Block
handle = ChainSyncHandle :: forall event.
IO Slot -> (event -> Slot -> IO ()) -> ChainSyncHandle event
ChainSyncHandle { cshCurrentSlot :: IO Slot
cshCurrentSlot = SlotConfig -> IO Slot
currentSlot SlotConfig
slotConfig
, cshHandler :: Block -> Slot -> IO ()
cshHandler = Block -> Slot -> IO ()
onNewBlock
}
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (IOManager -> IO ()) -> IO ()
WithIOManager
withIOManager ((IOManager -> IO ()) -> IO ()) -> (IOManager -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Second -> ChainSyncHandle Block -> IOManager -> IO ()
forall a.
TimeUnit a =>
a -> ChainSyncHandle Block -> IOManager -> IO ()
loop (Second
1 :: Second) ChainSyncHandle Block
handle
ChainSyncHandle Block -> IO (ChainSyncHandle Block)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ChainSyncHandle Block
handle
where
loop :: TimeUnit a => a -> ChainSyncHandle Block -> IOManager -> IO ()
loop :: a -> ChainSyncHandle Block -> IOManager -> IO ()
loop a
timeout ch :: ChainSyncHandle Block
ch@ChainSyncHandle{ Block -> Slot -> IO ()
cshHandler :: Block -> Slot -> IO ()
cshHandler :: forall event. ChainSyncHandle event -> event -> Slot -> IO ()
cshHandler } IOManager
iocp = do
IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
catchAll
(LocalSnocket
-> NetworkConnectTracers LocalAddress NodeToClientVersion
-> Versions
NodeToClientVersion
NodeToClientVersionData
(OuroborosApplication
'InitiatorMode LocalAddress ByteString IO () Void)
-> FilePath
-> IO ()
forall a b.
LocalSnocket
-> NetworkConnectTracers LocalAddress NodeToClientVersion
-> Versions
NodeToClientVersion
NodeToClientVersionData
(OuroborosApplication
'InitiatorMode LocalAddress ByteString IO a b)
-> FilePath
-> IO ()
connectTo
(IOManager -> LocalSnocket
localSnocket IOManager
iocp)
NetworkConnectTracers LocalAddress NodeToClientVersion
forall addr vNumber. NetworkConnectTracers addr vNumber
nullNetworkConnectTracers
(NodeToClientVersion
-> NodeToClientVersionData
-> (ConnectionId LocalAddress
-> STM IO ControlMessage
-> NodeToClientProtocols 'InitiatorMode ByteString IO () Void)
-> Versions
NodeToClientVersion
NodeToClientVersionData
(OuroborosApplication
'InitiatorMode LocalAddress ByteString IO () Void)
forall (m :: * -> *) (appType :: MuxMode) bytes a b.
NodeToClientVersion
-> NodeToClientVersionData
-> (ConnectionId LocalAddress
-> STM m ControlMessage
-> NodeToClientProtocols appType bytes m a b)
-> Versions
NodeToClientVersion
NodeToClientVersionData
(OuroborosApplication appType LocalAddress bytes m a b)
versionedNodeToClientProtocols
NodeToClientVersion
nodeToClientVersion
NodeToClientVersionData
nodeToClientVersionData
(\ConnectionId LocalAddress
_ STM IO ControlMessage
_ -> (Block -> Slot -> IO ())
-> NodeToClientProtocols 'InitiatorMode ByteString IO () Void
nodeToClientProtocols Block -> Slot -> IO ()
cshHandler))
FilePath
socketPath)
(\SomeException
_ -> do
Int -> IO ()
threadDelay (Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ a -> Integer
forall a. TimeUnit a => a -> Integer
toMicroseconds a
timeout)
a -> ChainSyncHandle Block -> IOManager -> IO ()
forall a.
TimeUnit a =>
a -> ChainSyncHandle Block -> IOManager -> IO ()
loop a
timeout ChainSyncHandle Block
ch IOManager
iocp)
nodeToClientProtocols
:: (Block -> Slot -> IO ())
-> NodeToClientProtocols 'InitiatorMode LBS.ByteString IO () Void
nodeToClientProtocols :: (Block -> Slot -> IO ())
-> NodeToClientProtocols 'InitiatorMode ByteString IO () Void
nodeToClientProtocols Block -> Slot -> IO ()
blockHandler =
NodeToClientProtocols :: forall (appType :: MuxMode) bytes (m :: * -> *) a b.
RunMiniProtocol appType bytes m a b
-> RunMiniProtocol appType bytes m a b
-> RunMiniProtocol appType bytes m a b
-> RunMiniProtocol appType bytes m a b
-> NodeToClientProtocols appType bytes m a b
NodeToClientProtocols
{ localChainSyncProtocol :: RunMiniProtocol 'InitiatorMode ByteString IO () Void
localChainSyncProtocol = (Block -> Slot -> IO ())
-> RunMiniProtocol 'InitiatorMode ByteString IO () Void
chainSync Block -> Slot -> IO ()
blockHandler
, localTxSubmissionProtocol :: RunMiniProtocol 'InitiatorMode ByteString IO () Void
localTxSubmissionProtocol = RunMiniProtocol 'InitiatorMode ByteString IO () Void
forall (m :: * -> *) a.
MonadTimer m =>
RunMiniProtocol 'InitiatorMode ByteString m a Void
doNothingInitiatorProtocol
, localStateQueryProtocol :: RunMiniProtocol 'InitiatorMode ByteString IO () Void
localStateQueryProtocol = RunMiniProtocol 'InitiatorMode ByteString IO () Void
forall (m :: * -> *) a.
MonadTimer m =>
RunMiniProtocol 'InitiatorMode ByteString m a Void
doNothingInitiatorProtocol
, localTxMonitorProtocol :: RunMiniProtocol 'InitiatorMode ByteString IO () Void
localTxMonitorProtocol = RunMiniProtocol 'InitiatorMode ByteString IO () Void
forall (m :: * -> *) a.
MonadTimer m =>
RunMiniProtocol 'InitiatorMode ByteString m a Void
doNothingInitiatorProtocol
}
chainSync :: (Block -> Slot -> IO ())
-> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
chainSync :: (Block -> Slot -> IO ())
-> RunMiniProtocol 'InitiatorMode ByteString IO () Void
chainSync Block -> Slot -> IO ()
onNewBlock' =
MuxPeer ByteString IO ()
-> RunMiniProtocol 'InitiatorMode ByteString IO () Void
forall bytes (m :: * -> *) a.
MuxPeer bytes m a -> RunMiniProtocol 'InitiatorMode bytes m a Void
InitiatorProtocolOnly (MuxPeer ByteString IO ()
-> RunMiniProtocol 'InitiatorMode ByteString IO () Void)
-> MuxPeer ByteString IO ()
-> RunMiniProtocol 'InitiatorMode ByteString IO () Void
forall a b. (a -> b) -> a -> b
$
Tracer
IO
(TraceSendRecv
(ChainSync
(CardanoBlock StandardCrypto)
(Point (CardanoBlock StandardCrypto))
Tip))
-> Codec
(ChainSync
(CardanoBlock StandardCrypto)
(Point (CardanoBlock StandardCrypto))
Tip)
DeserialiseFailure
IO
ByteString
-> Peer
(ChainSync
(CardanoBlock StandardCrypto)
(Point (CardanoBlock StandardCrypto))
Tip)
'AsClient
'StIdle
IO
()
-> MuxPeer ByteString IO ()
forall (pr :: PeerRole) ps (st :: ps) failure bytes (m :: * -> *)
a.
(Show failure, forall (st' :: ps). Show (ClientHasAgency st'),
forall (st' :: ps). Show (ServerHasAgency st'), ShowProxy ps) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Peer ps pr st m a
-> MuxPeer bytes m a
MuxPeer
Tracer
IO
(TraceSendRecv
(ChainSync
(CardanoBlock StandardCrypto)
(Point (CardanoBlock StandardCrypto))
Tip))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
Codec
(ChainSync
(CardanoBlock StandardCrypto)
(Point (CardanoBlock StandardCrypto))
Tip)
DeserialiseFailure
IO
ByteString
forall block.
(block ~ CardanoBlock StandardCrypto) =>
Codec
(ChainSync block (Point block) Tip)
DeserialiseFailure
IO
ByteString
chainSyncCodec
(ChainSyncClient
(CardanoBlock StandardCrypto)
(Point (CardanoBlock StandardCrypto))
Tip
IO
()
-> Peer
(ChainSync
(CardanoBlock StandardCrypto)
(Point (CardanoBlock StandardCrypto))
Tip)
'AsClient
'StIdle
IO
()
forall header point tip (m :: * -> *) a.
Monad m =>
ChainSyncClient header point tip m a
-> Peer (ChainSync header point tip) 'AsClient 'StIdle m a
ChainSync.chainSyncClientPeer
(SlotConfig
-> (CardanoBlock StandardCrypto -> Slot -> IO ())
-> ChainSyncClient
(CardanoBlock StandardCrypto)
(Point (CardanoBlock StandardCrypto))
Tip
IO
()
forall block.
SlotConfig
-> (block -> Slot -> IO ())
-> ChainSyncClient block (Point block) Tip IO ()
chainSyncClient SlotConfig
slotConfig (Block -> Slot -> IO ()
onNewBlock' (Block -> Slot -> IO ())
-> (CardanoBlock StandardCrypto -> Block)
-> CardanoBlock StandardCrypto
-> Slot
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CardanoBlock StandardCrypto -> Block
fromCardanoBlock)))
chainSyncClient :: forall block. SlotConfig
-> (block -> Slot -> IO ())
-> ChainSync.ChainSyncClient block (Point block) Tip IO ()
chainSyncClient :: SlotConfig
-> (block -> Slot -> IO ())
-> ChainSyncClient block (Point block) Tip IO ()
chainSyncClient SlotConfig
slotConfig block -> Slot -> IO ()
onNewBlock =
IO (ClientStIdle block (Point block) Tip IO ())
-> ChainSyncClient block (Point block) Tip IO ()
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSync.ChainSyncClient (IO (ClientStIdle block (Point block) Tip IO ())
-> ChainSyncClient block (Point block) Tip IO ())
-> IO (ClientStIdle block (Point block) Tip IO ())
-> ChainSyncClient block (Point block) Tip IO ()
forall a b. (a -> b) -> a -> b
$ ClientStIdle block (Point block) Tip IO ()
-> IO (ClientStIdle block (Point block) Tip IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ClientStIdle block (Point block) Tip IO ()
requestNext
where
requestNext :: ChainSync.ClientStIdle block (Point block) Tip IO ()
requestNext :: ClientStIdle block (Point block) Tip IO ()
requestNext =
ClientStNext block (Point block) Tip IO ()
-> IO (ClientStNext block (Point block) Tip IO ())
-> ClientStIdle block (Point block) Tip IO ()
forall header point tip (m :: * -> *) a.
ClientStNext header point tip m a
-> m (ClientStNext header point tip m a)
-> ClientStIdle header point tip m a
ChainSync.SendMsgRequestNext
ClientStNext block (Point block) Tip IO ()
handleNext
(ClientStNext block (Point block) Tip IO ()
-> IO (ClientStNext block (Point block) Tip IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ClientStNext block (Point block) Tip IO ()
handleNext)
handleNext :: ChainSync.ClientStNext block (Point block) Tip IO ()
handleNext :: ClientStNext block (Point block) Tip IO ()
handleNext =
ClientStNext :: forall header point tip (m :: * -> *) a.
(header -> tip -> ChainSyncClient header point tip m a)
-> (point -> tip -> ChainSyncClient header point tip m a)
-> ClientStNext header point tip m a
ChainSync.ClientStNext
{
recvMsgRollForward :: block -> Tip -> ChainSyncClient block (Point block) Tip IO ()
ChainSync.recvMsgRollForward = \block
block Tip
_ ->
IO (ClientStIdle block (Point block) Tip IO ())
-> ChainSyncClient block (Point block) Tip IO ()
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSync.ChainSyncClient (IO (ClientStIdle block (Point block) Tip IO ())
-> ChainSyncClient block (Point block) Tip IO ())
-> IO (ClientStIdle block (Point block) Tip IO ())
-> ChainSyncClient block (Point block) Tip IO ()
forall a b. (a -> b) -> a -> b
$ do
Slot
slot <- SlotConfig -> IO Slot
currentSlot SlotConfig
slotConfig
block -> Slot -> IO ()
onNewBlock block
block Slot
slot
ClientStIdle block (Point block) Tip IO ()
-> IO (ClientStIdle block (Point block) Tip IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ClientStIdle block (Point block) Tip IO ()
requestNext
, recvMsgRollBackward :: Point block -> Tip -> ChainSyncClient block (Point block) Tip IO ()
ChainSync.recvMsgRollBackward = FilePath
-> Point block
-> Tip
-> ChainSyncClient block (Point block) Tip IO ()
forall a. HasCallStack => FilePath -> a
error FilePath
"Not supported."
}
runTxSender :: FilePath
-> C.NetworkId
-> IO TxSendHandle
runTxSender :: FilePath -> NetworkId -> IO TxSendHandle
runTxSender FilePath
socketPath NetworkId
networkId = do
TQueue (Tx BabbageEra)
inputQueue <- IO (TQueue (Tx BabbageEra))
forall a. IO (TQueue a)
newTQueueIO
let handle :: TxSendHandle
handle = TxSendHandle :: TQueue (Tx BabbageEra) -> TxSendHandle
TxSendHandle { tshQueue :: TQueue (Tx BabbageEra)
tshQueue = TQueue (Tx BabbageEra)
inputQueue }
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (IOManager -> IO ()) -> IO ()
WithIOManager
withIOManager ((IOManager -> IO ()) -> IO ()) -> (IOManager -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Second -> TxSendHandle -> IOManager -> IO ()
forall a. TimeUnit a => a -> TxSendHandle -> IOManager -> IO ()
loop (Second
1 :: Second) TxSendHandle
handle
TxSendHandle -> IO TxSendHandle
forall (f :: * -> *) a. Applicative f => a -> f a
pure TxSendHandle
handle
where
loop :: TimeUnit a => a -> TxSendHandle -> IOManager -> IO ()
loop :: a -> TxSendHandle -> IOManager -> IO ()
loop a
timeout ch :: TxSendHandle
ch@TxSendHandle{ TQueue (Tx BabbageEra)
tshQueue :: TQueue (Tx BabbageEra)
tshQueue :: TxSendHandle -> TQueue (Tx BabbageEra)
tshQueue } IOManager
iocp = do
Tx BabbageEra
tx <- STM (Tx BabbageEra) -> IO (Tx BabbageEra)
forall a. STM a -> IO a
atomically (STM (Tx BabbageEra) -> IO (Tx BabbageEra))
-> STM (Tx BabbageEra) -> IO (Tx BabbageEra)
forall a b. (a -> b) -> a -> b
$ TQueue (Tx BabbageEra) -> STM (Tx BabbageEra)
forall a. TQueue a -> STM a
readTQueue TQueue (Tx BabbageEra)
tshQueue
SubmitResult (TxValidationErrorInMode CardanoMode)
_ <- LocalNodeConnectInfo CardanoMode
-> TxInMode CardanoMode
-> IO (SubmitResult (TxValidationErrorInMode CardanoMode))
forall mode.
LocalNodeConnectInfo mode
-> TxInMode mode
-> IO (SubmitResult (TxValidationErrorInMode mode))
C.submitTxToNodeLocal LocalNodeConnectInfo CardanoMode
localNodeConnectInfo (TxInMode CardanoMode
-> IO (SubmitResult (TxValidationErrorInMode CardanoMode)))
-> TxInMode CardanoMode
-> IO (SubmitResult (TxValidationErrorInMode CardanoMode))
forall a b. (a -> b) -> a -> b
$ Tx BabbageEra
-> EraInMode BabbageEra CardanoMode -> TxInMode CardanoMode
forall era mode. Tx era -> EraInMode era mode -> TxInMode mode
C.TxInMode Tx BabbageEra
tx EraInMode BabbageEra CardanoMode
C.BabbageEraInCardanoMode
Int -> IO ()
threadDelay (Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ a -> Integer
forall a. TimeUnit a => a -> Integer
toMicroseconds a
timeout)
a -> TxSendHandle -> IOManager -> IO ()
forall a. TimeUnit a => a -> TxSendHandle -> IOManager -> IO ()
loop a
timeout TxSendHandle
ch IOManager
iocp
localNodeConnectInfo :: LocalNodeConnectInfo CardanoMode
localNodeConnectInfo = LocalNodeConnectInfo :: forall mode.
ConsensusModeParams mode
-> NetworkId -> FilePath -> LocalNodeConnectInfo mode
C.LocalNodeConnectInfo {
localConsensusModeParams :: ConsensusModeParams CardanoMode
C.localConsensusModeParams = EpochSlots -> ConsensusModeParams CardanoMode
C.CardanoModeParams EpochSlots
epochSlots,
localNodeNetworkId :: NetworkId
C.localNodeNetworkId = NetworkId
networkId,
localNodeSocketPath :: FilePath
C.localNodeSocketPath = FilePath
socketPath }