{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE KindSignatures      #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications    #-}
{-# LANGUAGE TypeOperators       #-}
{-

Handlers for the websockets exposed by the PAB.

-}
module Plutus.PAB.Webserver.WebSocket
    ( wsHandler
    , combinedWebsocket
    , contractInstanceUpdates
    -- * Reports
    , getContractReport
    -- ** Streams of PAB events
    , openEndpoints
    , slotChange
    , observableStateChange
    ) where

import Cardano.Wallet.LocalClient.ExportTx (ExportTx)
import Control.Concurrent.Async (Async, async, waitAnyCancel)
import Control.Concurrent.STM (STM)
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.STM.Extras.Stream (STMStream, foldM, singleton, unfold, unfoldOn)
import Control.Exception (SomeException, handle)
import Control.Monad (forever, void)
import Control.Monad.Freer.Error (throwError)
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (ToJSON)
import Data.Aeson qualified as JSON
import Data.Bifunctor (Bifunctor (first))
import Data.Foldable (fold)
import Data.Map qualified as Map
import Data.Proxy (Proxy (Proxy))
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Text (Text)
import Data.Text qualified as Text
import Ledger (PubKeyHash)
import Ledger.Slot (Slot)
import Network.WebSockets qualified as WS
import Network.WebSockets.Connection (Connection, PendingConnection)
import Plutus.Contract.Effects (ActiveEndpoint)
import Plutus.PAB.Core (PABAction)
import Plutus.PAB.Core qualified as Core
import Plutus.PAB.Core.ContractInstance.STM (BlockchainEnv, OpenEndpoint (oepName))
import Plutus.PAB.Core.ContractInstance.STM qualified as Instances
import Plutus.PAB.Effects.Contract qualified as Contract
import Plutus.PAB.Events.ContractInstanceState (fromResp)
import Plutus.PAB.Types (PABError (OtherError))
import Plutus.PAB.Webserver.API ()
import Plutus.PAB.Webserver.Types (CombinedWSStreamToClient (InstanceUpdate, SlotChange),
                                   CombinedWSStreamToServer (Subscribe, Unsubscribe),
                                   ContractReport (ContractReport, crActiveContractStates, crAvailableContracts),
                                   ContractSignatureResponse (ContractSignatureResponse),
                                   InstanceStatusToClient (ContractFinished, NewActiveEndpoints, NewObservableState, NewYieldedExportTxs))
import Servant ((:<|>) ((:<|>)))
import Wallet.Types (ContractInstanceId)

getContractReport :: forall t env. Contract.PABContract t => PABAction t env (ContractReport (Contract.ContractDef t))
getContractReport :: PABAction t env (ContractReport (ContractDef t))
getContractReport = do
    [ContractDef t]
availableContracts <- forall (effs :: [* -> *]).
Member (ContractDefinition t) effs =>
Eff effs [ContractDef t]
forall t (effs :: [* -> *]).
Member (ContractDefinition t) effs =>
Eff effs [ContractDef t]
Contract.getDefinitions @t
    [ContractInstanceId]
activeContractIDs <- ((ContractInstanceId, ContractActivationArgs (ContractDef t))
 -> ContractInstanceId)
-> [(ContractInstanceId, ContractActivationArgs (ContractDef t))]
-> [ContractInstanceId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ContractInstanceId, ContractActivationArgs (ContractDef t))
-> ContractInstanceId
forall a b. (a, b) -> a
fst ([(ContractInstanceId, ContractActivationArgs (ContractDef t))]
 -> [ContractInstanceId])
-> (Map ContractInstanceId (ContractActivationArgs (ContractDef t))
    -> [(ContractInstanceId, ContractActivationArgs (ContractDef t))])
-> Map ContractInstanceId (ContractActivationArgs (ContractDef t))
-> [ContractInstanceId]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map ContractInstanceId (ContractActivationArgs (ContractDef t))
-> [(ContractInstanceId, ContractActivationArgs (ContractDef t))]
forall k a. Map k a -> [(k, a)]
Map.toList (Map ContractInstanceId (ContractActivationArgs (ContractDef t))
 -> [ContractInstanceId])
-> Eff
     (PABEffects t env)
     (Map ContractInstanceId (ContractActivationArgs (ContractDef t)))
-> Eff (PABEffects t env) [ContractInstanceId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (effs :: [* -> *]).
Member (ContractStore t) effs =>
Eff
  effs
  (Map ContractInstanceId (ContractActivationArgs (ContractDef t)))
forall t (effs :: [* -> *]).
Member (ContractStore t) effs =>
Eff
  effs
  (Map ContractInstanceId (ContractActivationArgs (ContractDef t)))
Contract.getActiveContracts @t
    [ContractSignatureResponse (ContractDef t)]
crAvailableContracts <- [ContractSignatureResponse (ContractDef t)]
-> Eff
     (PABEffects t env) [ContractSignatureResponse (ContractDef t)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([ContractSignatureResponse (ContractDef t)]
 -> Eff
      (PABEffects t env) [ContractSignatureResponse (ContractDef t)])
-> [ContractSignatureResponse (ContractDef t)]
-> Eff
     (PABEffects t env) [ContractSignatureResponse (ContractDef t)]
forall a b. (a -> b) -> a -> b
$ (ContractDef t -> ContractSignatureResponse (ContractDef t))
-> [ContractDef t] -> [ContractSignatureResponse (ContractDef t)]
forall a b. (a -> b) -> [a] -> [b]
map ContractDef t -> ContractSignatureResponse (ContractDef t)
forall t. t -> ContractSignatureResponse t
ContractSignatureResponse [ContractDef t]
availableContracts
    [(ContractInstanceId, PartiallyDecodedResponse PABReq)]
crActiveContractStates <- (ContractInstanceId
 -> Eff
      (PABEffects t env)
      (ContractInstanceId, PartiallyDecodedResponse PABReq))
-> [ContractInstanceId]
-> Eff
     (PABEffects t env)
     [(ContractInstanceId, PartiallyDecodedResponse PABReq)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (\ContractInstanceId
i -> ContractInstanceId -> Eff (PABEffects t env) (State t)
forall t (effs :: [* -> *]).
Member (ContractStore t) effs =>
ContractInstanceId -> Eff effs (State t)
Contract.getState @t ContractInstanceId
i Eff (PABEffects t env) (State t)
-> (State t
    -> Eff
         (PABEffects t env)
         (ContractInstanceId, PartiallyDecodedResponse PABReq))
-> Eff
     (PABEffects t env)
     (ContractInstanceId, PartiallyDecodedResponse PABReq)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \State t
s -> (ContractInstanceId, PartiallyDecodedResponse PABReq)
-> Eff
     (PABEffects t env)
     (ContractInstanceId, PartiallyDecodedResponse PABReq)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ContractInstanceId
i, ContractResponse Value Value PABResp PABReq
-> PartiallyDecodedResponse PABReq
forall s v.
ContractResponse Value Value s v -> PartiallyDecodedResponse v
fromResp (ContractResponse Value Value PABResp PABReq
 -> PartiallyDecodedResponse PABReq)
-> ContractResponse Value Value PABResp PABReq
-> PartiallyDecodedResponse PABReq
forall a b. (a -> b) -> a -> b
$ Proxy t -> State t -> ContractResponse Value Value PABResp PABReq
forall contract.
PABContract contract =>
Proxy contract
-> State contract -> ContractResponse Value Value PABResp PABReq
Contract.serialisableState (Proxy t
forall k (t :: k). Proxy t
Proxy @t) State t
s)) [ContractInstanceId]
activeContractIDs
    ContractReport (ContractDef t)
-> PABAction t env (ContractReport (ContractDef t))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ContractReport :: forall t.
[ContractSignatureResponse t]
-> [(ContractInstanceId, PartiallyDecodedResponse PABReq)]
-> ContractReport t
ContractReport {[ContractSignatureResponse (ContractDef t)]
crAvailableContracts :: [ContractSignatureResponse (ContractDef t)]
crAvailableContracts :: [ContractSignatureResponse (ContractDef t)]
crAvailableContracts, [(ContractInstanceId, PartiallyDecodedResponse PABReq)]
crActiveContractStates :: [(ContractInstanceId, PartiallyDecodedResponse PABReq)]
crActiveContractStates :: [(ContractInstanceId, PartiallyDecodedResponse PABReq)]
crActiveContractStates}

combinedUpdates :: forall t env. WSState -> PABAction t env (STMStream CombinedWSStreamToClient)
combinedUpdates :: WSState -> PABAction t env (STMStream CombinedWSStreamToClient)
combinedUpdates WSState
wsState =
    WSState -> BlockchainEnv -> STMStream CombinedWSStreamToClient
combinedWSStreamToClient WSState
wsState
        (BlockchainEnv -> STMStream CombinedWSStreamToClient)
-> Eff (PABEffects t env) BlockchainEnv
-> PABAction t env (STMStream CombinedWSStreamToClient)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall (effs :: [* -> *]).
Member (Reader (PABEnvironment t env)) effs =>
Eff effs BlockchainEnv
forall t env (effs :: [* -> *]).
Member (Reader (PABEnvironment t env)) effs =>
Eff effs BlockchainEnv
Core.askBlockchainEnv @t @env)

-- | The subscriptions for a websocket (wallet funds and contract instance notifications)
data WSState = WSState
    { WSState -> TVar (Map ContractInstanceId InstanceState)
wsInstances :: STM.TVar (Map.Map ContractInstanceId Instances.InstanceState) -- ^ Contract instances that we want updates for
    , WSState -> TVar (Set PubKeyHash)
wsWallets   :: STM.TVar (Set PubKeyHash) -- ^ Wallets whose funds we are watching
    }

combinedWSStreamToClient :: WSState -> BlockchainEnv -> STMStream CombinedWSStreamToClient
combinedWSStreamToClient :: WSState -> BlockchainEnv -> STMStream CombinedWSStreamToClient
combinedWSStreamToClient WSState{TVar (Map ContractInstanceId InstanceState)
wsInstances :: TVar (Map ContractInstanceId InstanceState)
wsInstances :: WSState -> TVar (Map ContractInstanceId InstanceState)
wsInstances} BlockchainEnv
blockchainEnv = do
    Map ContractInstanceId InstanceState
instances <- (Map ContractInstanceId InstanceState -> Set ContractInstanceId)
-> STM (Map ContractInstanceId InstanceState)
-> STMStream (Map ContractInstanceId InstanceState)
forall a b. Eq b => (a -> b) -> STM a -> STMStream a
unfoldOn Map ContractInstanceId InstanceState -> Set ContractInstanceId
forall k a. Map k a -> Set k
Map.keysSet (TVar (Map ContractInstanceId InstanceState)
-> STM (Map ContractInstanceId InstanceState)
forall a. TVar a -> STM a
STM.readTVar TVar (Map ContractInstanceId InstanceState)
wsInstances)
    let mkInstanceStream :: (ContractInstanceId, InstanceState)
-> STMStream CombinedWSStreamToClient
mkInstanceStream (ContractInstanceId
instanceId, InstanceState
instanceState) = ContractInstanceId
-> InstanceStatusToClient -> CombinedWSStreamToClient
InstanceUpdate ContractInstanceId
instanceId (InstanceStatusToClient -> CombinedWSStreamToClient)
-> STMStream InstanceStatusToClient
-> STMStream CombinedWSStreamToClient
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> InstanceState -> STMStream InstanceStatusToClient
instanceUpdates InstanceState
instanceState
    [STMStream CombinedWSStreamToClient]
-> STMStream CombinedWSStreamToClient
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold
        [ Slot -> CombinedWSStreamToClient
SlotChange (Slot -> CombinedWSStreamToClient)
-> STMStream Slot -> STMStream CombinedWSStreamToClient
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BlockchainEnv -> STMStream Slot
slotChange BlockchainEnv
blockchainEnv
        , ((ContractInstanceId, InstanceState)
 -> STMStream CombinedWSStreamToClient)
-> [(ContractInstanceId, InstanceState)]
-> STMStream CombinedWSStreamToClient
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (ContractInstanceId, InstanceState)
-> STMStream CombinedWSStreamToClient
mkInstanceStream (Map ContractInstanceId InstanceState
-> [(ContractInstanceId, InstanceState)]
forall k a. Map k a -> [(k, a)]
Map.toList Map ContractInstanceId InstanceState
instances)
        ]

initialWSState :: STM WSState
initialWSState :: STM WSState
initialWSState = TVar (Map ContractInstanceId InstanceState)
-> TVar (Set PubKeyHash) -> WSState
WSState (TVar (Map ContractInstanceId InstanceState)
 -> TVar (Set PubKeyHash) -> WSState)
-> STM (TVar (Map ContractInstanceId InstanceState))
-> STM (TVar (Set PubKeyHash) -> WSState)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map ContractInstanceId InstanceState
-> STM (TVar (Map ContractInstanceId InstanceState))
forall a. a -> STM (TVar a)
STM.newTVar Map ContractInstanceId InstanceState
forall a. Monoid a => a
mempty STM (TVar (Set PubKeyHash) -> WSState)
-> STM (TVar (Set PubKeyHash)) -> STM WSState
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Set PubKeyHash -> STM (TVar (Set PubKeyHash))
forall a. a -> STM (TVar a)
STM.newTVar Set PubKeyHash
forall a. Monoid a => a
mempty

slotChange :: BlockchainEnv -> STMStream Slot
slotChange :: BlockchainEnv -> STMStream Slot
slotChange = STM Slot -> STMStream Slot
forall a. Eq a => STM a -> STMStream a
unfold (STM Slot -> STMStream Slot)
-> (BlockchainEnv -> STM Slot) -> BlockchainEnv -> STMStream Slot
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlockchainEnv -> STM Slot
Instances.currentSlot

observableStateChange :: Instances.InstanceState -> STMStream JSON.Value
observableStateChange :: InstanceState -> STMStream Value
observableStateChange = STM Value -> STMStream Value
forall a. Eq a => STM a -> STMStream a
unfold (STM Value -> STMStream Value)
-> (InstanceState -> STM Value) -> InstanceState -> STMStream Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InstanceState -> STM Value
Instances.observableContractState

openEndpoints :: Instances.InstanceState -> STMStream [ActiveEndpoint]
openEndpoints :: InstanceState -> STMStream [ActiveEndpoint]
openEndpoints InstanceState
instanceState =
    STM [ActiveEndpoint] -> STMStream [ActiveEndpoint]
forall a. Eq a => STM a -> STMStream a
unfold (STM [ActiveEndpoint] -> STMStream [ActiveEndpoint])
-> STM [ActiveEndpoint] -> STMStream [ActiveEndpoint]
forall a b. (a -> b) -> a -> b
$ (Map (RequestID, IterationID) OpenEndpoint -> [ActiveEndpoint])
-> STM (Map (RequestID, IterationID) OpenEndpoint)
-> STM [ActiveEndpoint]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((((RequestID, IterationID), OpenEndpoint) -> ActiveEndpoint)
-> [((RequestID, IterationID), OpenEndpoint)] -> [ActiveEndpoint]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (OpenEndpoint -> ActiveEndpoint
oepName (OpenEndpoint -> ActiveEndpoint)
-> (((RequestID, IterationID), OpenEndpoint) -> OpenEndpoint)
-> ((RequestID, IterationID), OpenEndpoint)
-> ActiveEndpoint
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((RequestID, IterationID), OpenEndpoint) -> OpenEndpoint
forall a b. (a, b) -> b
snd) ([((RequestID, IterationID), OpenEndpoint)] -> [ActiveEndpoint])
-> (Map (RequestID, IterationID) OpenEndpoint
    -> [((RequestID, IterationID), OpenEndpoint)])
-> Map (RequestID, IterationID) OpenEndpoint
-> [ActiveEndpoint]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map (RequestID, IterationID) OpenEndpoint
-> [((RequestID, IterationID), OpenEndpoint)]
forall k a. Map k a -> [(k, a)]
Map.toList) (STM (Map (RequestID, IterationID) OpenEndpoint)
 -> STM [ActiveEndpoint])
-> STM (Map (RequestID, IterationID) OpenEndpoint)
-> STM [ActiveEndpoint]
forall a b. (a -> b) -> a -> b
$ InstanceState -> STM (Map (RequestID, IterationID) OpenEndpoint)
Instances.openEndpoints InstanceState
instanceState

yieldedExportTxsChange :: Instances.InstanceState -> STMStream [ExportTx]
yieldedExportTxsChange :: InstanceState -> STMStream [ExportTx]
yieldedExportTxsChange = STM [ExportTx] -> STMStream [ExportTx]
forall a. Eq a => STM a -> STMStream a
unfold (STM [ExportTx] -> STMStream [ExportTx])
-> (InstanceState -> STM [ExportTx])
-> InstanceState
-> STMStream [ExportTx]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InstanceState -> STM [ExportTx]
Instances.yieldedExportTxs

finalValue :: Instances.InstanceState -> STMStream (Maybe JSON.Value)
finalValue :: InstanceState -> STMStream (Maybe Value)
finalValue = STM (Maybe Value) -> STMStream (Maybe Value)
forall a. STM a -> STMStream a
singleton (STM (Maybe Value) -> STMStream (Maybe Value))
-> (InstanceState -> STM (Maybe Value))
-> InstanceState
-> STMStream (Maybe Value)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InstanceState -> STM (Maybe Value)
Instances.finalResult

-- | Get a stream of instance updates for a given 'InstanceState'
instanceUpdates :: Instances.InstanceState -> STMStream InstanceStatusToClient
instanceUpdates :: InstanceState -> STMStream InstanceStatusToClient
instanceUpdates InstanceState
instanceState =
    [STMStream InstanceStatusToClient]
-> STMStream InstanceStatusToClient
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold ([STMStream InstanceStatusToClient]
 -> STMStream InstanceStatusToClient)
-> [STMStream InstanceStatusToClient]
-> STMStream InstanceStatusToClient
forall a b. (a -> b) -> a -> b
$
        [ Value -> InstanceStatusToClient
NewObservableState  (Value -> InstanceStatusToClient)
-> STMStream Value -> STMStream InstanceStatusToClient
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> InstanceState -> STMStream Value
observableStateChange InstanceState
instanceState
        , [ActiveEndpoint] -> InstanceStatusToClient
NewActiveEndpoints  ([ActiveEndpoint] -> InstanceStatusToClient)
-> STMStream [ActiveEndpoint] -> STMStream InstanceStatusToClient
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> InstanceState -> STMStream [ActiveEndpoint]
openEndpoints InstanceState
instanceState
        , [ExportTx] -> InstanceStatusToClient
NewYieldedExportTxs ([ExportTx] -> InstanceStatusToClient)
-> STMStream [ExportTx] -> STMStream InstanceStatusToClient
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> InstanceState -> STMStream [ExportTx]
yieldedExportTxsChange InstanceState
instanceState
        , Maybe Value -> InstanceStatusToClient
ContractFinished    (Maybe Value -> InstanceStatusToClient)
-> STMStream (Maybe Value) -> STMStream InstanceStatusToClient
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> InstanceState -> STMStream (Maybe Value)
finalValue InstanceState
instanceState
        ]

-- | Send all updates from an 'STMStream' to a websocket until it finishes.
streamToWebsocket :: forall t env a. ToJSON a => Connection -> STMStream a -> PABAction t env ()
streamToWebsocket :: Connection -> STMStream a -> PABAction t env ()
streamToWebsocket Connection
connection STMStream a
stream = IO () -> PABAction t env ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PABAction t env ()) -> IO () -> PABAction t env ()
forall a b. (a -> b) -> a -> b
$
    STMStream a -> (a -> IO ()) -> IO () -> IO ()
forall a. STMStream a -> (a -> IO ()) -> IO () -> IO ()
foldM STMStream a
stream (Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendTextData Connection
connection (ByteString -> IO ()) -> (a -> ByteString) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ByteString
forall a. ToJSON a => a -> ByteString
JSON.encode) (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())

-- | Handler for WSAPI
wsHandler ::
    forall t env.
    (ContractInstanceId -> PendingConnection -> PABAction t env ())
    :<|> (PendingConnection -> PABAction t env ())
wsHandler :: (ContractInstanceId -> PendingConnection -> PABAction t env ())
:<|> (PendingConnection -> PABAction t env ())
wsHandler =
    ContractInstanceId -> PendingConnection -> PABAction t env ()
forall t env.
ContractInstanceId -> PendingConnection -> PABAction t env ()
contractInstanceUpdates (ContractInstanceId -> PendingConnection -> PABAction t env ())
-> (PendingConnection -> PABAction t env ())
-> (ContractInstanceId -> PendingConnection -> PABAction t env ())
   :<|> (PendingConnection -> PABAction t env ())
forall a b. a -> b -> a :<|> b
:<|> PendingConnection -> PABAction t env ()
forall t env. PendingConnection -> PABAction t env ()
combinedWebsocket

sendContractInstanceUpdatesToClient :: forall t env. ContractInstanceId -> Connection -> PABAction t env ()
sendContractInstanceUpdatesToClient :: ContractInstanceId -> Connection -> PABAction t env ()
sendContractInstanceUpdatesToClient ContractInstanceId
instanceId Connection
connection = do
    InstanceState
instanceState <- ContractInstanceId -> PABAction t env InstanceState
forall t env. ContractInstanceId -> PABAction t env InstanceState
Core.instanceStateInternal @t @env ContractInstanceId
instanceId
    Connection
-> STMStream InstanceStatusToClient -> PABAction t env ()
forall t env a.
ToJSON a =>
Connection -> STMStream a -> PABAction t env ()
streamToWebsocket Connection
connection (InstanceState -> STMStream InstanceStatusToClient
instanceUpdates InstanceState
instanceState)

contractInstanceUpdates :: forall t env. ContractInstanceId -> PendingConnection -> PABAction t env ()
contractInstanceUpdates :: ContractInstanceId -> PendingConnection -> PABAction t env ()
contractInstanceUpdates ContractInstanceId
contractInstanceId PendingConnection
pending = do
    Core.PABRunner{forall a. PABAction t env a -> IO (Either PABError a)
runPABAction :: forall t env.
PABRunner t env
-> forall a. PABAction t env a -> IO (Either PABError a)
runPABAction :: forall a. PABAction t env a -> IO (Either PABError a)
Core.runPABAction} <- PABAction t env (PABRunner t env)
forall t env. PABAction t env (PABRunner t env)
Core.pabRunner
    IO () -> PABAction t env ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PABAction t env ()) -> IO () -> PABAction t env ()
forall a b. (a -> b) -> a -> b
$ do
        Connection
connection <- PendingConnection -> IO Connection
WS.acceptRequest PendingConnection
pending
        (SomeException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle SomeException -> IO ()
disconnect (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> Int -> IO () -> IO () -> IO ()
forall a. Connection -> Int -> IO () -> IO a -> IO a
WS.withPingThread Connection
connection Int
30 (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (Either PABError () -> ()) -> IO (Either PABError ()) -> IO ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((PABError -> ()) -> (() -> ()) -> Either PABError () -> ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ([Char] -> ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> ()) -> (PABError -> [Char]) -> PABError -> ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PABError -> [Char]
forall a. Show a => a -> [Char]
show) () -> ()
forall a. a -> a
id) (IO (Either PABError ()) -> IO ())
-> (PABAction t env () -> IO (Either PABError ()))
-> PABAction t env ()
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PABAction t env () -> IO (Either PABError ())
forall a. PABAction t env a -> IO (Either PABError a)
runPABAction (PABAction t env () -> IO ()) -> PABAction t env () -> IO ()
forall a b. (a -> b) -> a -> b
$ ContractInstanceId -> Connection -> PABAction t env ()
forall t env.
ContractInstanceId -> Connection -> PABAction t env ()
sendContractInstanceUpdatesToClient ContractInstanceId
contractInstanceId Connection
connection
  where
    disconnect :: SomeException -> IO ()
    disconnect :: SomeException -> IO ()
disconnect SomeException
_ = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

combinedWebsocket :: forall t env. PendingConnection -> PABAction t env ()
combinedWebsocket :: PendingConnection -> PABAction t env ()
combinedWebsocket PendingConnection
pending = do
    PABRunner t env
pabRunner <- PABAction t env (PABRunner t env)
forall t env. PABAction t env (PABRunner t env)
Core.pabRunner
    WSState
wsState <- IO WSState -> Eff (PABEffects t env) WSState
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WSState -> Eff (PABEffects t env) WSState)
-> IO WSState -> Eff (PABEffects t env) WSState
forall a b. (a -> b) -> a -> b
$ STM WSState -> IO WSState
forall a. STM a -> IO a
STM.atomically STM WSState
initialWSState
    IO () -> PABAction t env ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PABAction t env ()) -> IO () -> PABAction t env ()
forall a b. (a -> b) -> a -> b
$ do
        Connection
connection <- PendingConnection -> IO Connection
WS.acceptRequest PendingConnection
pending
        (SomeException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle SomeException -> IO ()
disconnect (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> Int -> IO () -> IO () -> IO ()
forall a. Connection -> Int -> IO () -> IO a -> IO a
WS.withPingThread Connection
connection Int
30 (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ PABRunner t env -> WSState -> Connection -> IO ()
forall t env. PABRunner t env -> WSState -> Connection -> IO ()
combinedWebsocketThread PABRunner t env
pabRunner WSState
wsState Connection
connection
  where
    disconnect :: SomeException -> IO ()
    disconnect :: SomeException -> IO ()
disconnect SomeException
_ = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

combinedWebsocketThread :: forall t env. Core.PABRunner t env -> WSState -> Connection -> IO ()
combinedWebsocketThread :: PABRunner t env -> WSState -> Connection -> IO ()
combinedWebsocketThread Core.PABRunner{forall a. PABAction t env a -> IO (Either PABError a)
runPABAction :: forall a. PABAction t env a -> IO (Either PABError a)
runPABAction :: forall t env.
PABRunner t env
-> forall a. PABAction t env a -> IO (Either PABError a)
Core.runPABAction} WSState
wsState Connection
connection = do
        [Async (Either PABError ())]
tasks :: [Async (Either PABError ())] <-
            (PABAction t env () -> IO (Async (Either PABError ())))
-> [PABAction t env ()] -> IO [Async (Either PABError ())]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse
                PABAction t env () -> IO (Async (Either PABError ()))
asyncApp
                [ Connection -> WSState -> PABAction t env ()
forall t env. Connection -> WSState -> PABAction t env ()
sendCombinedUpdatesToClient Connection
connection WSState
wsState
                , Connection -> WSState -> PABAction t env ()
forall t env. Connection -> WSState -> PABAction t env ()
receiveMessagesFromClient Connection
connection WSState
wsState
                ]
        IO (Async (Either PABError ()), Either PABError ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async (Either PABError ()), Either PABError ()) -> IO ())
-> IO (Async (Either PABError ()), Either PABError ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ [Async (Either PABError ())]
-> IO (Async (Either PABError ()), Either PABError ())
forall a. [Async a] -> IO (Async a, a)
waitAnyCancel [Async (Either PABError ())]
tasks
    where
        asyncApp :: PABAction t env () -> IO (Async (Either PABError ()))
asyncApp = IO (Either PABError ()) -> IO (Async (Either PABError ()))
forall a. IO a -> IO (Async a)
async (IO (Either PABError ()) -> IO (Async (Either PABError ())))
-> (PABAction t env () -> IO (Either PABError ()))
-> PABAction t env ()
-> IO (Async (Either PABError ()))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PABAction t env () -> IO (Either PABError ())
forall a. PABAction t env a -> IO (Either PABError a)
runPABAction

sendCombinedUpdatesToClient :: forall t env. Connection -> WSState -> PABAction t env ()
sendCombinedUpdatesToClient :: Connection -> WSState -> PABAction t env ()
sendCombinedUpdatesToClient Connection
connection WSState
wsState = WSState -> PABAction t env (STMStream CombinedWSStreamToClient)
forall t env.
WSState -> PABAction t env (STMStream CombinedWSStreamToClient)
combinedUpdates WSState
wsState PABAction t env (STMStream CombinedWSStreamToClient)
-> (STMStream CombinedWSStreamToClient -> PABAction t env ())
-> PABAction t env ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection
-> STMStream CombinedWSStreamToClient -> PABAction t env ()
forall t env a.
ToJSON a =>
Connection -> STMStream a -> PABAction t env ()
streamToWebsocket Connection
connection

receiveMessagesFromClient :: forall t env. Connection -> WSState -> PABAction t env ()
receiveMessagesFromClient :: Connection -> WSState -> PABAction t env ()
receiveMessagesFromClient Connection
connection WSState
wsState = PABAction t env () -> PABAction t env ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (PABAction t env () -> PABAction t env ())
-> PABAction t env () -> PABAction t env ()
forall a b. (a -> b) -> a -> b
$ do
    ByteString
msg <- IO ByteString -> Eff (PABEffects t env) ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> Eff (PABEffects t env) ByteString)
-> IO ByteString -> Eff (PABEffects t env) ByteString
forall a b. (a -> b) -> a -> b
$ Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
connection
    let result :: Either Text CombinedWSStreamToServer
        result :: Either Text CombinedWSStreamToServer
result = ([Char] -> Text)
-> Either [Char] CombinedWSStreamToServer
-> Either Text CombinedWSStreamToServer
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first [Char] -> Text
Text.pack (Either [Char] CombinedWSStreamToServer
 -> Either Text CombinedWSStreamToServer)
-> Either [Char] CombinedWSStreamToServer
-> Either Text CombinedWSStreamToServer
forall a b. (a -> b) -> a -> b
$ ByteString -> Either [Char] CombinedWSStreamToServer
forall a. FromJSON a => ByteString -> Either [Char] a
JSON.eitherDecode ByteString
msg
    case Either Text CombinedWSStreamToServer
result of
        Right (Subscribe (Right PubKeyHash
l)) -> IO () -> PABAction t env ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PABAction t env ()) -> IO () -> PABAction t env ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ WSState -> PubKeyHash -> STM ()
addWallet WSState
wsState PubKeyHash
l
        Right (Subscribe (Left ContractInstanceId
i))  -> do
            InstanceState
state <- ContractInstanceId -> PABAction t env InstanceState
forall t env. ContractInstanceId -> PABAction t env InstanceState
Core.instanceStateInternal ContractInstanceId
i
            IO () -> PABAction t env ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PABAction t env ()) -> IO () -> PABAction t env ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ WSState -> ContractInstanceId -> InstanceState -> STM ()
addInstanceId WSState
wsState ContractInstanceId
i InstanceState
state
        Right (Unsubscribe Either ContractInstanceId PubKeyHash
l) -> IO () -> PABAction t env ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PABAction t env ()) -> IO () -> PABAction t env ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ (ContractInstanceId -> STM ())
-> (PubKeyHash -> STM ())
-> Either ContractInstanceId PubKeyHash
-> STM ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (WSState -> ContractInstanceId -> STM ()
removeInstanceId WSState
wsState) (WSState -> PubKeyHash -> STM ()
removeWallet WSState
wsState) Either ContractInstanceId PubKeyHash
l
        Left Text
e                -> PABError -> PABAction t env ()
forall e (effs :: [* -> *]) a.
Member (Error e) effs =>
e -> Eff effs a
throwError (Text -> PABError
OtherError Text
e)

addInstanceId :: WSState -> ContractInstanceId -> Instances.InstanceState -> STM ()
addInstanceId :: WSState -> ContractInstanceId -> InstanceState -> STM ()
addInstanceId WSState{TVar (Map ContractInstanceId InstanceState)
wsInstances :: TVar (Map ContractInstanceId InstanceState)
wsInstances :: WSState -> TVar (Map ContractInstanceId InstanceState)
wsInstances} ContractInstanceId
k InstanceState
v = TVar (Map ContractInstanceId InstanceState)
-> (Map ContractInstanceId InstanceState
    -> Map ContractInstanceId InstanceState)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar TVar (Map ContractInstanceId InstanceState)
wsInstances (ContractInstanceId
-> InstanceState
-> Map ContractInstanceId InstanceState
-> Map ContractInstanceId InstanceState
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ContractInstanceId
k InstanceState
v)

addWallet :: WSState -> PubKeyHash -> STM ()
addWallet :: WSState -> PubKeyHash -> STM ()
addWallet WSState{TVar (Set PubKeyHash)
wsWallets :: TVar (Set PubKeyHash)
wsWallets :: WSState -> TVar (Set PubKeyHash)
wsWallets} PubKeyHash
w = TVar (Set PubKeyHash)
-> (Set PubKeyHash -> Set PubKeyHash) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar TVar (Set PubKeyHash)
wsWallets (PubKeyHash -> Set PubKeyHash -> Set PubKeyHash
forall a. Ord a => a -> Set a -> Set a
Set.insert PubKeyHash
w)

removeInstanceId :: WSState -> ContractInstanceId -> STM ()
removeInstanceId :: WSState -> ContractInstanceId -> STM ()
removeInstanceId WSState{TVar (Map ContractInstanceId InstanceState)
wsInstances :: TVar (Map ContractInstanceId InstanceState)
wsInstances :: WSState -> TVar (Map ContractInstanceId InstanceState)
wsInstances} ContractInstanceId
i = TVar (Map ContractInstanceId InstanceState)
-> (Map ContractInstanceId InstanceState
    -> Map ContractInstanceId InstanceState)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar TVar (Map ContractInstanceId InstanceState)
wsInstances (ContractInstanceId
-> Map ContractInstanceId InstanceState
-> Map ContractInstanceId InstanceState
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete ContractInstanceId
i)

removeWallet :: WSState -> PubKeyHash -> STM ()
removeWallet :: WSState -> PubKeyHash -> STM ()
removeWallet WSState{TVar (Set PubKeyHash)
wsWallets :: TVar (Set PubKeyHash)
wsWallets :: WSState -> TVar (Set PubKeyHash)
wsWallets} PubKeyHash
w = TVar (Set PubKeyHash)
-> (Set PubKeyHash -> Set PubKeyHash) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar TVar (Set PubKeyHash)
wsWallets (PubKeyHash -> Set PubKeyHash -> Set PubKeyHash
forall a. Ord a => a -> Set a -> Set a
Set.delete PubKeyHash
w)