diff --git a/cabal.project b/cabal.project index 38a7def..e78b2ed 100644 --- a/cabal.project +++ b/cabal.project @@ -37,6 +37,18 @@ if(os(windows)) constraints: bitvec -simd + +source-repository-package + type: git + location: https://github.com/IntersectMBO/ouroboros-network + tag: 58f3ffb5abf74cd105f7c10593e01205dced7a9e + --sha256: sha256-x0feBtpO3zsvvq8Ocu5YEJFuRsVS/ptnLMEhXCAG+WI= + subdir: + acts-generic + cardano-diffusion + ouroboros-network + network-mux + if impl(ghc >= 9.12.0) allow-newer: *:time, *:nothunks, diff --git a/dmq-node/app/Main.hs b/dmq-node/app/Main.hs index b43819d..28849b6 100644 --- a/dmq-node/app/Main.hs +++ b/dmq-node/app/Main.hs @@ -50,7 +50,6 @@ import DMQ.Diffusion.NodeKernel import DMQ.Diffusion.PeerSelection (policy) import DMQ.Handlers.TopLevel (toplevelExceptionHandler) import DMQ.NodeToClient qualified as NtC -import DMQ.NodeToClient.LocalStateQueryClient import DMQ.NodeToNode (NodeToNodeVersion, dmqCodecs, dmqLimitsAndTimeouts, ntnApps) import DMQ.Policy qualified as Policy @@ -93,8 +92,7 @@ runDMQ commandLineConfig = do dmqConfig@Configuration { dmqcTopologyFile = I topologyFile, dmqcCardanoNodeSocket = I socketPath, - dmqcVersion = I version, - dmqcLedgerPeers = I ledgerPeers + dmqcVersion = I version } = fromRight mempty config' <> commandLineConfig `act` @@ -122,7 +120,6 @@ runDMQ commandLineConfig = do ( dmqTracers@DMQTracers { dmqStartupTracer, - localStateQueryClientTracer, sigValidationTracer, localSigValidationTracer, cardanoNodeHandshakeTracer @@ -170,18 +167,12 @@ runDMQ commandLineConfig = do -- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port. withIOManager \iocp -> do - let localSnocket' = localSnocket iocp - mkStakePoolMonitor = connectToCardanoNode - localStateQueryClientTracer - ledgerPeers - localSnocket' - socketPath - withNodeKernel @StandardCrypto dmqTracers + (localSnocket iocp) + makeLocalBearer dmqConfig - psRng - mkStakePoolMonitor $ \nodeKernel -> do + psRng $ \nodeKernel -> do dmqDiffusionConfiguration <- mkDiffusionConfiguration dmqConfig nt nodeKernel.stakePools.ledgerBigPeersVar diff --git a/dmq-node/dmq-node.cabal b/dmq-node/dmq-node.cabal index 508baec..a135f9e 100644 --- a/dmq-node/dmq-node.cabal +++ b/dmq-node/dmq-node.cabal @@ -110,6 +110,7 @@ library cardano-ledger-byron, cardano-ledger-core, cardano-ledger-shelley, + cardano-protocol-tpraos, cardano-slotting, cardano-strict-containers, cborg >=0.2.1 && <0.3, @@ -136,7 +137,7 @@ library singletons, text >=1.2.4 && <2.2, time >=1.12 && <1.15, - trace-dispatcher ^>=2.12.0, + trace-dispatcher ^>=2.11.0 || ^>=2.12.0, transformers, typed-protocols:{typed-protocols, cborg} ^>=1.2, diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs index fe8980d..7b9f984 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs @@ -1,15 +1,19 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE PackageImports #-} -{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE PackageImports #-} +{-# LANGUAGE RankNTypes #-} module DMQ.Diffusion.NodeKernel ( module DMQ.Diffusion.NodeKernel.Types , withNodeKernel ) where +import Control.Applicative (Alternative) import Control.Concurrent.Class.MonadMVar import Control.Concurrent.Class.MonadSTM.Strict import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadST import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI @@ -18,22 +22,44 @@ import "contra-tracer" Control.Tracer (nullTracer) import Data.Function (on) import Data.Hashable import Data.Map.Strict qualified as Map +import Data.Proxy import Data.Sequence (Seq) import Data.Sequence qualified as Seq import Data.Set (Set) import Data.Set qualified as Set import Data.Time.Clock.POSIX (POSIXTime) import Data.Time.Clock.POSIX qualified as Time -import Data.Void (Void) +import Data.Void (Void, absurd) import System.Random (StdGen) import System.Random qualified as Random +import Network.Mux qualified as Mx + +import Cardano.Chain.Slotting (EpochSlots (..)) +import Cardano.Network.NodeToClient qualified as Cardano.NtoC +import Cardano.Protocol.Crypto qualified as Cardano (StandardCrypto) + +import Ouroboros.Consensus.Cardano.Block (CardanoBlock) +import Ouroboros.Consensus.Cardano.Node +import Ouroboros.Consensus.Network.NodeToClient +import Ouroboros.Consensus.Node.NetworkProtocolVersion +import Ouroboros.Consensus.Node.ProtocolInfo + import Ouroboros.Network.BlockFetch (newFetchClientRegistry) -import Ouroboros.Network.Magic (NetworkMagic (..)) +import Ouroboros.Network.Handshake.Queryable (Queryable (..)) +import Ouroboros.Network.Mux qualified as Mx import Ouroboros.Network.PeerSelection.Governor.Types (makePublicPeerSelectionStateVar) import Ouroboros.Network.PeerSharing (newPeerSharingAPI, newPeerSharingRegistry, ps_POLICY_PEER_SHARE_MAX_PEERS, ps_POLICY_PEER_SHARE_STICKY_TIME) +import Ouroboros.Network.Protocol.Handshake (Acceptable (..)) +import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, + noTimeLimitsHandshake) +import Ouroboros.Network.Protocol.LocalStateQuery.Client +import Ouroboros.Network.Protocol.LocalStateQuery.Type +import Ouroboros.Network.Snocket (Snocket, localAddressFromPath) +import Ouroboros.Network.Socket (ConnectToArgs (..), + HandshakeCallbacks (HandshakeCallbacks), connectToNode) import Ouroboros.Network.TxSubmission.Inbound.V2 import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..), MempoolSeq (..), WithIndex (..)) @@ -41,10 +67,12 @@ import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool import DMQ.Configuration import DMQ.Diffusion.NodeKernel.Types +import DMQ.NodeToClient.LocalStateQueryClient import DMQ.Policy qualified as Policy import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt, sigId), SigId) import DMQ.Tracer + newNodeKernel :: forall crypto ntnAddr m. ( MonadLabelledSTM m , MonadMVar m @@ -107,30 +135,43 @@ newNodeKernel rng = do withNodeKernel :: forall crypto ntnAddr ntcAddr m a. - ( MonadAsync m - , MonadFork m - , MonadDelay m - , MonadLabelledSTM m - , MonadMask m - , MonadMVar m - , MonadTime m + ( Alternative (STM m) + , MonadAsync m + , MonadEvaluate m + , MonadFork m + , MonadDelay m + , MonadLabelledSTM m + , MonadMask m + , MonadMVar m + , Mx.MonadReadBuffer m + , MonadST m + , MonadThrow (STM m) + , MonadTime m + , MonadTimer m , Ord ntnAddr , Hashable ntnAddr ) => DMQTracers crypto ntnAddr ntcAddr m + -> Snocket m Cardano.NtoC.LocalSocket LocalAddress + -> Mx.MakeBearer m Cardano.NtoC.LocalSocket -> Configuration -> StdGen - -> (NetworkMagic -> NodeKernel crypto ntnAddr m -> m (Either SomeException Void)) -> (NodeKernel crypto ntnAddr m -> m a) -- ^ as soon as the callback exits the `mempoolWorker` and all -- decision logic threads will be killed -> m a -withNodeKernel DMQTracers { sigSubmissionLogicTracer } +withNodeKernel DMQTracers { sigSubmissionLogicTracer, + localStateQueryClientTracer + } + localSnocket + mkLocalBearer Configuration { - dmqcCardanoNetworkMagic = I networkMagic + dmqcCardanoNetworkMagic = I networkMagic, + dmqcCardanoNodeSocket = I cardanoNodeSocketPath, + dmqcLedgerPeers = I ledgerPeers } rng - mkStakePoolMonitor k = do + k = do nodeKernel@NodeKernel { mempool, sigChannelVar, sigSharedTxStateVar @@ -145,11 +186,77 @@ withNodeKernel DMQTracers { sigSubmissionLogicTracer } sigChannelVar sigSharedTxStateVar) $ \sigLogicThread -> - withAsync (mkStakePoolMonitor networkMagic nodeKernel) \spmAid -> do + withAsync (connectToCardanoNode nodeKernel) \spmAid -> do link mempoolThread link sigLogicThread link spmAid k nodeKernel + where + connectToCardanoNode :: NodeKernel crypto ntnAddr m + -> m (Either SomeException Void) + connectToCardanoNode nodeKernel = + fmap fn <$> + connectToNode + localSnocket + mkLocalBearer + ConnectToArgs { + ctaHandshakeCodec = Cardano.NtoC.nodeToClientHandshakeCodec, + ctaHandshakeTimeLimits = noTimeLimitsHandshake, + ctaVersionDataCodec = cborTermVersionDataCodec Cardano.NtoC.nodeToClientCodecCBORTerm, + ctaConnectTracers = Cardano.NtoC.nullNetworkConnectTracers, --debuggingNetworkConnectTracers, + ctaHandshakeCallbacks = HandshakeCallbacks acceptableVersion queryVersion + } + (\_ -> return ()) + (Cardano.NtoC.combineVersions + [ Cardano.NtoC.simpleSingletonVersions + version + Cardano.NtoC.NodeToClientVersionData { + Cardano.NtoC.networkMagic + , Cardano.NtoC.query = False + } + \_version -> + Mx.OuroborosApplication + [ Mx.MiniProtocol + { Mx.miniProtocolNum = Mx.MiniProtocolNum 7 + , Mx.miniProtocolStart = Mx.StartEagerly + , Mx.miniProtocolLimits = + Mx.MiniProtocolLimits + { Mx.maximumIngressQueue = 0xffffffff + } + , Mx.miniProtocolRun = + Mx.InitiatorProtocolOnly + . Mx.mkMiniProtocolCbFromPeerSt + . const + $ ( nullTracer -- TODO: add tracer + , cStateQueryCodec + , StateIdle + , localStateQueryClientPeer $ + cardanoLocalStateQueryClient + localStateQueryClientTracer + ledgerPeers + (stakePools nodeKernel) + (nextEpochVar nodeKernel) + ) + } + ] + | version <- [minBound..maxBound] + , let -- NOTE: the query protocol is running using + -- `Cardano.StandardCrypto`, while `dmq-node` is using + -- `StandardCrypto` defined in `kes-agent-krypto`. A priori + -- cryptography could differ but it shouldn't be a problem. We + -- are querying + supportedVersionMap = + supportedNodeToClientVersions (Proxy :: Proxy (CardanoBlock Cardano.StandardCrypto)) + blk = supportedVersionMap Map.! version + Codecs {cStateQueryCodec} = + clientCodecs (pClientInfoCodecConfig . protocolClientInfoCardano $ EpochSlots 21600) + blk version + ]) + Nothing + (localAddressFromPath cardanoNodeSocketPath) + where + fn :: forall x. Either x Void -> x + fn = either id absurd mempoolWorker :: forall crypto m. diff --git a/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs b/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs index e19e8b1..7927c94 100644 --- a/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs +++ b/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs @@ -5,8 +5,8 @@ module DMQ.NodeToClient.LocalStateQueryClient ( TraceLocalStateQueryClient (..) - , cardanoClient - , connectToCardanoNode + , CardanoLocalStateQueryClient + , cardanoLocalStateQueryClient ) where import Control.Concurrent.Class.MonadSTM.Strict @@ -15,38 +15,28 @@ import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI import Control.Monad.Trans.Except -import "contra-tracer" Control.Tracer (Tracer, nullTracer, traceWith) +import "contra-tracer" Control.Tracer (Tracer, traceWith) import Data.Functor ((<&>)) import Data.List.NonEmpty qualified as NonEmpty -import Data.Map.Strict qualified as Map -import Data.Proxy import Data.Void -import Cardano.Chain.Slotting (EpochSlots (..)) import Cardano.Ledger.Api.State.Query (StakeSnapshots (..)) -import Cardano.Network.NodeToClient import Cardano.Network.PeerSelection (LedgerPeerSnapshot (..), LedgerRelayAccessPoint (..), SingLedgerPeersKind (..)) import Cardano.Slotting.EpochInfo.API import Cardano.Slotting.Time -import DMQ.Diffusion.NodeKernel +import DMQ.Diffusion.NodeKernel.Types (StakePools (..)) + import Ouroboros.Consensus.Cardano.Block -import Ouroboros.Consensus.Cardano.Node import Ouroboros.Consensus.HardFork.Combinator.Ledger.Query import Ouroboros.Consensus.HardFork.History.EpochInfo (interpreterToEpochInfo) import Ouroboros.Consensus.Ledger.Query (Query (..)) -import Ouroboros.Consensus.Network.NodeToClient -import Ouroboros.Consensus.Node.NetworkProtocolVersion -import Ouroboros.Consensus.Node.ProtocolInfo import Ouroboros.Consensus.Shelley.Ledger.Query import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol () import Ouroboros.Network.Block -import Ouroboros.Network.Magic -import Ouroboros.Network.Mux qualified as Mx import Ouroboros.Network.PeerSelection.LedgerPeers (LedgerPeersKind (..), - accumulateBigLedgerStake) -import Ouroboros.Network.PeerSelection.LedgerPeers.Type (RawBlockHash) + RawBlockHash, accumulateBigLedgerStake) import Ouroboros.Network.Point (Block (..)) import Ouroboros.Network.Protocol.LocalStateQuery.Client import Ouroboros.Network.Protocol.LocalStateQuery.Type @@ -58,34 +48,65 @@ data QueryError = UnsupportedEra instance Exception QueryError where --- TODO generalize to handle ledger eras other than Conway --- | connects the dmq node to cardano node via local state query --- and updates the node kernel with stake pool data necessary to perform message --- validation -- -cardanoClient - :: forall block query point crypto m. (MonadDelay m, MonadSTM m, MonadThrow m, MonadTime m) - => (block ~ CardanoBlock crypto, query ~ Query block, point ~ Point block) +-- Type aliases +-- + +-- | `LocalStateQuery` using `CardanoBlock` +type CardanoLocalStateQueryClient crypto m a = + LocalStateQueryClient (CardanoBlock crypto) + (Point (CardanoBlock crypto)) + (Query (CardanoBlock crypto)) m Void + +-- | `ClientStAcuiring` using `CardanoBlock` +type CardanoClientStAcquiring crypto m a = + ClientStAcquiring (CardanoBlock crypto) (Point (CardanoBlock crypto)) (Query (CardanoBlock crypto)) m a + +-- | `ClientStAcuired` using `CardanoBlock` +type CardanoClientStAcquired crypto m a = + ClientStAcquired (CardanoBlock crypto) (Point (CardanoBlock crypto)) (Query (CardanoBlock crypto)) m a + +-- | `ClientStQuerying` using `CardanoBlock` +type CardanoClientStQuerying crypto m a b = + ClientStQuerying (CardanoBlock crypto) (Point (CardanoBlock crypto)) (Query (CardanoBlock crypto)) m a b + + +-- | Local state query client which queries cardano node for +-- +-- * stake pool data (for signature validation) +-- * ledger peers (for peer selection) +-- +-- TODO generalize to handle ledger eras other than Conway. +-- +cardanoLocalStateQueryClient + :: forall crypto m. + ( MonadDelay m + , MonadSTM m + , MonadThrow m + , MonadTime m + ) => Tracer m TraceLocalStateQueryClient -> Bool -- ^ use ledger peers -> StakePools m -> StrictTVar m (Maybe UTCTime) -- ^ from node kernel - -> LocalStateQueryClient (CardanoBlock crypto) (Point block) (Query block) m Void -cardanoClient tracer ledgerPeers - StakePools { - stakePoolsVar, - ledgerPeersVar, - ledgerBigPeersVar - } - nextEpochVar = - LocalStateQueryClient (idle Nothing) + -> CardanoLocalStateQueryClient crypto m Void +cardanoLocalStateQueryClient + tracer ledgerPeers + StakePools { + stakePoolsVar, + ledgerPeersVar, + ledgerBigPeersVar + } + nextEpochVar + = + LocalStateQueryClient (idle Nothing) where idle mSystemStart = do traceWith tracer $ Acquiring mSystemStart -- FIXME: switched to volatiletip for prerelease testing purposes pure $ SendMsgAcquire VolatileTip {-ImmutableTip-} acquire where - acquire :: ClientStAcquiring block point query m Void + acquire :: CardanoClientStAcquiring crypto m Void acquire = ClientStAcquiring { recvMsgAcquired = let epochQry systemStart = pure $ @@ -101,8 +122,8 @@ cardanoClient tracer ledgerPeers } wrappingMismatch :: forall err r. - (r -> m (ClientStAcquired block point query m Void)) - -> ClientStQuerying block point query m Void (Either err r) + (r -> m (CardanoClientStAcquired crypto m Void)) + -> CardanoClientStQuerying crypto m Void (Either err r) wrappingMismatch k = ClientStQuerying $ either (const . throwIO . userError $ "mismatch era info") k @@ -135,12 +156,7 @@ cardanoClient tracer ledgerPeers queryCurrentEra :: SystemStart -> UTCTime - -> ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void + -> CardanoClientStAcquired crypto m Void queryCurrentEra systemStart nextEpoch = SendMsgQuery (BlockQuery (QueryHardFork GetCurrentEra)) $ ClientStQuerying $ \era -> queryStakeSnapshots systemStart nextEpoch era @@ -150,12 +166,7 @@ cardanoClient tracer ledgerPeers :: SystemStart -> UTCTime -> EraIndex (CardanoEras crypto) - -> m (ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void) + -> m (CardanoClientStAcquired crypto m Void) queryStakeSnapshots systemStart nextEpoch era = case era of EraByron{} -> throwIO UnsupportedEra @@ -176,12 +187,7 @@ cardanoClient tracer ledgerPeers where handleStakeSnapshots :: StakeSnapshots - -> m (ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void) + -> m (CardanoClientStAcquired crypto m Void) handleStakeSnapshots StakeSnapshots { ssStakeSnapshots } = do atomically do writeTVar stakePoolsVar ssStakeSnapshots @@ -201,12 +207,7 @@ cardanoClient tracer ledgerPeers queryLedgerPeers :: SystemStart -> NominalDiffTime - -> ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void + -> CardanoClientStAcquired crypto m Void queryLedgerPeers systemStart toNextEpoch = SendMsgQuery (BlockQuery . QueryIfCurrentConway $ GetLedgerPeerSnapshot SingAllLedgerPeers) $ wrappingMismatch handleLedgerPeers @@ -256,63 +257,7 @@ cardanoClient tracer ledgerPeers -- release, continue the loop in `idle` release :: SystemStart -> NominalDiffTime - -> ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void + -> CardanoClientStAcquired crypto m Void release systemStart toNextEpoch = SendMsgRelease do threadDelay $ min (realToFrac toNextEpoch) 86400 -- TODO fuzz this? idle $ Just systemStart - - -connectToCardanoNode :: Tracer IO TraceLocalStateQueryClient - -> Bool -- ^ use ledger peers - -> LocalSnocket - -> FilePath - -> NetworkMagic - -> NodeKernel crypto ntnAddr IO - -> IO (Either SomeException Void) -connectToCardanoNode tracer ledgerPeers localSnocket' snocketPath networkMagic nodeKernel = - connectTo - localSnocket' - nullNetworkConnectTracers --debuggingNetworkConnectTracers - (combineVersions - [ simpleSingletonVersions - version - NodeToClientVersionData { - networkMagic - , query = False - } - \_version -> - Mx.OuroborosApplication - [ Mx.MiniProtocol - { miniProtocolNum = Mx.MiniProtocolNum 7 - , miniProtocolStart = Mx.StartEagerly - , miniProtocolLimits = - Mx.MiniProtocolLimits - { maximumIngressQueue = 0xffffffff - } - , miniProtocolRun = - Mx.InitiatorProtocolOnly - . Mx.mkMiniProtocolCbFromPeerSt - . const - $ ( nullTracer -- TODO: add tracer - , cStateQueryCodec - , StateIdle - , localStateQueryClientPeer - $ cardanoClient tracer - ledgerPeers - (stakePools nodeKernel) - (nextEpochVar nodeKernel) - ) - } - ] - | version <- [minBound..maxBound] - , let supportedVersionMap = supportedNodeToClientVersions (Proxy :: Proxy (CardanoBlock StandardCrypto)) - blk = supportedVersionMap Map.! version - Codecs {cStateQueryCodec} = - clientCodecs (pClientInfoCodecConfig . protocolClientInfoCardano $ EpochSlots 21600) blk version - ]) - snocketPath