Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ if(os(windows))
constraints:
bitvec -simd


source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-network
tag: 58f3ffb5abf74cd105f7c10593e01205dced7a9e
--sha256: sha256-x0feBtpO3zsvvq8Ocu5YEJFuRsVS/ptnLMEhXCAG+WI=
subdir:
acts-generic
cardano-diffusion
ouroboros-network
network-mux

if impl(ghc >= 9.12.0)
allow-newer: *:time,
*:nothunks,
Expand Down
17 changes: 4 additions & 13 deletions dmq-node/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import DMQ.Diffusion.NodeKernel
import DMQ.Diffusion.PeerSelection (policy)
import DMQ.Handlers.TopLevel (toplevelExceptionHandler)
import DMQ.NodeToClient qualified as NtC
import DMQ.NodeToClient.LocalStateQueryClient
import DMQ.NodeToNode (NodeToNodeVersion, dmqCodecs, dmqLimitsAndTimeouts,
ntnApps)
import DMQ.Policy qualified as Policy
Expand Down Expand Up @@ -93,8 +92,7 @@ runDMQ commandLineConfig = do
dmqConfig@Configuration {
dmqcTopologyFile = I topologyFile,
dmqcCardanoNodeSocket = I socketPath,
dmqcVersion = I version,
dmqcLedgerPeers = I ledgerPeers
dmqcVersion = I version
} = fromRight mempty config'
<> commandLineConfig
`act`
Expand Down Expand Up @@ -122,7 +120,6 @@ runDMQ commandLineConfig = do

( dmqTracers@DMQTracers {
dmqStartupTracer,
localStateQueryClientTracer,
sigValidationTracer,
localSigValidationTracer,
cardanoNodeHandshakeTracer
Expand Down Expand Up @@ -170,18 +167,12 @@ runDMQ commandLineConfig = do

-- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port.
withIOManager \iocp -> do
let localSnocket' = localSnocket iocp
mkStakePoolMonitor = connectToCardanoNode
localStateQueryClientTracer
ledgerPeers
localSnocket'
socketPath

withNodeKernel @StandardCrypto
dmqTracers
(localSnocket iocp)
makeLocalBearer
dmqConfig
psRng
mkStakePoolMonitor $ \nodeKernel -> do
psRng $ \nodeKernel -> do
dmqDiffusionConfiguration <-
mkDiffusionConfiguration dmqConfig nt nodeKernel.stakePools.ledgerBigPeersVar

Expand Down
3 changes: 2 additions & 1 deletion dmq-node/dmq-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ library
cardano-ledger-byron,
cardano-ledger-core,
cardano-ledger-shelley,
cardano-protocol-tpraos,
cardano-slotting,
cardano-strict-containers,
cborg >=0.2.1 && <0.3,
Expand All @@ -136,7 +137,7 @@ library
singletons,
text >=1.2.4 && <2.2,
time >=1.12 && <1.15,
trace-dispatcher ^>=2.12.0,
trace-dispatcher ^>=2.11.0 || ^>=2.12.0,
transformers,
typed-protocols:{typed-protocols, cborg} ^>=1.2,

Expand Down
141 changes: 124 additions & 17 deletions dmq-node/src/DMQ/Diffusion/NodeKernel.hs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE RankNTypes #-}

module DMQ.Diffusion.NodeKernel
( module DMQ.Diffusion.NodeKernel.Types
, withNodeKernel
) where

import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadMVar
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
Expand All @@ -18,33 +22,57 @@ import "contra-tracer" Control.Tracer (nullTracer)
import Data.Function (on)
import Data.Hashable
import Data.Map.Strict qualified as Map
import Data.Proxy
import Data.Sequence (Seq)
import Data.Sequence qualified as Seq
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Time.Clock.POSIX (POSIXTime)
import Data.Time.Clock.POSIX qualified as Time
import Data.Void (Void)
import Data.Void (Void, absurd)
import System.Random (StdGen)
import System.Random qualified as Random

import Network.Mux qualified as Mx

import Cardano.Chain.Slotting (EpochSlots (..))
import Cardano.Network.NodeToClient qualified as Cardano.NtoC
import Cardano.Protocol.Crypto qualified as Cardano (StandardCrypto)

import Ouroboros.Consensus.Cardano.Block (CardanoBlock)
import Ouroboros.Consensus.Cardano.Node
import Ouroboros.Consensus.Network.NodeToClient
import Ouroboros.Consensus.Node.NetworkProtocolVersion
import Ouroboros.Consensus.Node.ProtocolInfo

import Ouroboros.Network.BlockFetch (newFetchClientRegistry)
import Ouroboros.Network.Magic (NetworkMagic (..))
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
import Ouroboros.Network.Mux qualified as Mx
import Ouroboros.Network.PeerSelection.Governor.Types
(makePublicPeerSelectionStateVar)
import Ouroboros.Network.PeerSharing (newPeerSharingAPI, newPeerSharingRegistry,
ps_POLICY_PEER_SHARE_MAX_PEERS, ps_POLICY_PEER_SHARE_STICKY_TIME)
import Ouroboros.Network.Protocol.Handshake (Acceptable (..))
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
noTimeLimitsHandshake)
import Ouroboros.Network.Protocol.LocalStateQuery.Client
import Ouroboros.Network.Protocol.LocalStateQuery.Type
import Ouroboros.Network.Snocket (Snocket, localAddressFromPath)
import Ouroboros.Network.Socket (ConnectToArgs (..),
HandshakeCallbacks (HandshakeCallbacks), connectToNode)
import Ouroboros.Network.TxSubmission.Inbound.V2
import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..),
MempoolSeq (..), WithIndex (..))
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool

import DMQ.Configuration
import DMQ.Diffusion.NodeKernel.Types
import DMQ.NodeToClient.LocalStateQueryClient
import DMQ.Policy qualified as Policy
import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt, sigId), SigId)
import DMQ.Tracer


newNodeKernel :: forall crypto ntnAddr m.
( MonadLabelledSTM m
, MonadMVar m
Expand Down Expand Up @@ -107,30 +135,43 @@ newNodeKernel rng = do


withNodeKernel :: forall crypto ntnAddr ntcAddr m a.
( MonadAsync m
, MonadFork m
, MonadDelay m
, MonadLabelledSTM m
, MonadMask m
, MonadMVar m
, MonadTime m
( Alternative (STM m)
, MonadAsync m
, MonadEvaluate m
, MonadFork m
, MonadDelay m
, MonadLabelledSTM m
, MonadMask m
, MonadMVar m
, Mx.MonadReadBuffer m
, MonadST m
, MonadThrow (STM m)
, MonadTime m
, MonadTimer m
, Ord ntnAddr
, Hashable ntnAddr
)
=> DMQTracers crypto ntnAddr ntcAddr m
-> Snocket m Cardano.NtoC.LocalSocket LocalAddress
-> Mx.MakeBearer m Cardano.NtoC.LocalSocket
-> Configuration
-> StdGen
-> (NetworkMagic -> NodeKernel crypto ntnAddr m -> m (Either SomeException Void))
-> (NodeKernel crypto ntnAddr m -> m a)
-- ^ as soon as the callback exits the `mempoolWorker` and all
-- decision logic threads will be killed
-> m a
withNodeKernel DMQTracers { sigSubmissionLogicTracer }
withNodeKernel DMQTracers { sigSubmissionLogicTracer,
localStateQueryClientTracer
}
localSnocket
mkLocalBearer
Configuration {
dmqcCardanoNetworkMagic = I networkMagic
dmqcCardanoNetworkMagic = I networkMagic,
dmqcCardanoNodeSocket = I cardanoNodeSocketPath,
dmqcLedgerPeers = I ledgerPeers
}
rng
mkStakePoolMonitor k = do
k = do
nodeKernel@NodeKernel { mempool,
sigChannelVar,
sigSharedTxStateVar
Expand All @@ -145,11 +186,77 @@ withNodeKernel DMQTracers { sigSubmissionLogicTracer }
sigChannelVar
sigSharedTxStateVar)
$ \sigLogicThread ->
withAsync (mkStakePoolMonitor networkMagic nodeKernel) \spmAid -> do
withAsync (connectToCardanoNode nodeKernel) \spmAid -> do
link mempoolThread
link sigLogicThread
link spmAid
k nodeKernel
where
connectToCardanoNode :: NodeKernel crypto ntnAddr m
-> m (Either SomeException Void)
connectToCardanoNode nodeKernel =
fmap fn <$>
connectToNode
localSnocket
mkLocalBearer
ConnectToArgs {
ctaHandshakeCodec = Cardano.NtoC.nodeToClientHandshakeCodec,
ctaHandshakeTimeLimits = noTimeLimitsHandshake,
ctaVersionDataCodec = cborTermVersionDataCodec Cardano.NtoC.nodeToClientCodecCBORTerm,
ctaConnectTracers = Cardano.NtoC.nullNetworkConnectTracers, --debuggingNetworkConnectTracers,
ctaHandshakeCallbacks = HandshakeCallbacks acceptableVersion queryVersion
}
(\_ -> return ())
(Cardano.NtoC.combineVersions
[ Cardano.NtoC.simpleSingletonVersions
version
Cardano.NtoC.NodeToClientVersionData {
Cardano.NtoC.networkMagic
, Cardano.NtoC.query = False
}
\_version ->
Mx.OuroborosApplication
[ Mx.MiniProtocol
{ Mx.miniProtocolNum = Mx.MiniProtocolNum 7
, Mx.miniProtocolStart = Mx.StartEagerly
, Mx.miniProtocolLimits =
Mx.MiniProtocolLimits
{ Mx.maximumIngressQueue = 0xffffffff
}
, Mx.miniProtocolRun =
Mx.InitiatorProtocolOnly
. Mx.mkMiniProtocolCbFromPeerSt
. const
$ ( nullTracer -- TODO: add tracer
, cStateQueryCodec
, StateIdle
, localStateQueryClientPeer $
cardanoLocalStateQueryClient
localStateQueryClientTracer
ledgerPeers
(stakePools nodeKernel)
(nextEpochVar nodeKernel)
)
}
]
| version <- [minBound..maxBound]
, let -- NOTE: the query protocol is running using
-- `Cardano.StandardCrypto`, while `dmq-node` is using
-- `StandardCrypto` defined in `kes-agent-krypto`. A priori
-- cryptography could differ but it shouldn't be a problem. We
-- are querying
supportedVersionMap =
supportedNodeToClientVersions (Proxy :: Proxy (CardanoBlock Cardano.StandardCrypto))
blk = supportedVersionMap Map.! version
Codecs {cStateQueryCodec} =
clientCodecs (pClientInfoCodecConfig . protocolClientInfoCardano $ EpochSlots 21600)
blk version
])
Nothing
(localAddressFromPath cardanoNodeSocketPath)
where
fn :: forall x. Either x Void -> x
fn = either id absurd


mempoolWorker :: forall crypto m.
Expand Down
Loading
Loading