Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions sei-tendermint/internal/autobahn/producer/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ func NewState(cfg *Config, txMempool *mempool.TxMempool, consensus *consensus.St
}

// makePayload constructs payload for the next produced block.
// It waits for enough transactions OR until `cfg.BlockInterval` passes.
func (s *State) makePayload(ctx context.Context) *types.Payload {
ctx, cancel := context.WithTimeout(ctx, s.cfg.BlockInterval)
defer cancel()

if s.txMempool.NumTxsNotPending() == 0 {
select {
case <-ctx.Done():
case <-s.txMempool.TxsAvailable():
}
// It waits for any transactions OR until `cfg.BlockInterval` passes.
func (s *State) makePayload(ctx context.Context) (*types.Payload, error) {
// Wait for transactions. We give up and produce an empty block if mempool is empty for
// cfg.BlockInterval.
_ = utils.WithTimeout(ctx, s.cfg.BlockInterval, func(ctx context.Context) error {
return s.txMempool.TxStore().WaitForTxs(ctx)
Comment thread
wen-coding marked this conversation as resolved.
})
// If the context has been cancelled though, we just fail.
if err := ctx.Err(); err != nil {
return nil, err
}

txs, gasEstimated := s.txMempool.PopTxs(mempool.ReapLimits{
Expand All @@ -83,16 +83,16 @@ func (s *State) makePayload(ctx context.Context) *types.Payload {
if err != nil {
panic(fmt.Errorf("PayloadBuilder{}.Build(): %w", err))
}
return payload
return payload, nil
}

// nextPayload constructs the payload for the next block.
// Wrapper of makePayload which ensures that the block is not empty (if required).
func (s *State) nextPayload(ctx context.Context) (*types.Payload, error) {
for {
payload := s.makePayload(ctx)
if ctx.Err() != nil {
return nil, ctx.Err()
payload, err := s.makePayload(ctx)
if err != nil {
return nil, err
}
if len(payload.Txs()) > 0 || s.cfg.AllowEmptyBlocks {
return payload, nil
Expand Down
28 changes: 4 additions & 24 deletions sei-tendermint/internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,6 @@ type TxMempool struct {
// height defines the last block height process during Update()
height int64

// sizeBytes defines the total size of the mempool (sum of all tx bytes)
sizeBytes int64

// pendingSizeBytes defines the total size of the pending set (sum of all tx bytes)
pendingSizeBytes int64

// cache defines a fixed-size cache of already seen transactions as this
// reduces pressure on the proxyApp.
cache TxCache
Expand Down Expand Up @@ -302,27 +296,21 @@ func (txmp *TxMempool) NumTxsNotPending() int {
}

func (txmp *TxMempool) BytesNotPending() int64 {
txmp.txStore.mtx.RLock()
defer txmp.txStore.mtx.RUnlock()
totalBytes := int64(0)
for _, wrappedTx := range txmp.txStore.hashTxs {
totalBytes += int64(len(wrappedTx.tx))
}
return totalBytes
return txmp.txStore.AllTxsBytes()
}

func (txmp *TxMempool) TotalTxsBytesSize() int64 {
return txmp.BytesNotPending() + int64(txmp.pendingTxs.SizeBytes()) //nolint:gosec // mempool size is bounded by configured limits; no overflow risk
return txmp.BytesNotPending() + txmp.pendingTxs.SizeBytes()
}

// PendingSize returns the number of pending transactions in the mempool.
func (txmp *TxMempool) PendingSize() int { return txmp.pendingTxs.Size() }

// SizeBytes return the total sum in bytes of all the valid transactions in the
// mempool. It is thread-safe.
func (txmp *TxMempool) SizeBytes() int64 { return atomic.LoadInt64(&txmp.sizeBytes) }
func (txmp *TxMempool) SizeBytes() int64 { return txmp.txStore.AllTxsBytes() }

func (txmp *TxMempool) PendingSizeBytes() int64 { return atomic.LoadInt64(&txmp.pendingSizeBytes) }
func (txmp *TxMempool) PendingSizeBytes() int64 { return txmp.pendingTxs.SizeBytes() }

// WaitForNextTx waits until the next transaction is available for gossip.
// Returns the next valid transaction to gossip.
Expand Down Expand Up @@ -503,7 +491,6 @@ func (txmp *TxMempool) CheckTx(
removeHandler(true)
return nil, err
}
atomic.AddInt64(&txmp.pendingSizeBytes, int64(wtx.Size()))
if err := txmp.pendingTxs.Insert(wtx, res, txInfo); err != nil {
return nil, err
}
Expand Down Expand Up @@ -586,7 +573,6 @@ func (txmp *TxMempool) Flush() {
txmp.removeTx(wtx, false, false, true)
}

atomic.SwapInt64(&txmp.sizeBytes, 0)
txmp.cache.Reset()
}

Expand Down Expand Up @@ -1136,7 +1122,6 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
wtx.gossipEl = gossipEl

txmp.metrics.InsertedTxs.Add(1)
atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size()))
return true
}

Expand All @@ -1158,8 +1143,6 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReen
wtx.gossipEl.DetachPrev()

txmp.metrics.RemovedTxs.Add(1)
atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))

wtx.removeHandler(removeFromCache)

if shouldReenqueue {
Expand Down Expand Up @@ -1233,7 +1216,6 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {

// remove pending txs that have expired
txmp.pendingTxs.PurgeExpired(blockHeight, now, func(wtx *WrappedTx) {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-wtx.Size()))
txmp.expire(blockHeight, wtx)
})
}
Expand Down Expand Up @@ -1266,13 +1248,11 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string
func (txmp *TxMempool) handlePendingTransactions() {
accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions()
for _, tx := range accepted {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-tx.tx.Size()))
if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil {
logger.Error("error adding pending transaction", "err", err)
}
}
for _, tx := range rejected {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-tx.tx.Size()))
if !txmp.config.KeepInvalidTxsInCache {
tx.tx.removeHandler(true)
}
Expand Down
15 changes: 8 additions & 7 deletions sei-tendermint/internal/mempool/reactor/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,6 @@ func (r *Reactor) processMempoolCh(ctx context.Context) error {
// handle PeerUpdate messages. When the reactor is stopped, we will catch the
// signal and close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) error {
if !r.cfg.Broadcast {
return nil
}
Comment thread
pompon0 marked this conversation as resolved.
return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error {
recv := r.router.Subscribe()
peerRoutines := map[types.NodeID]context.CancelFunc{}
Expand All @@ -223,10 +220,14 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) error {
pctx, pcancel := context.WithCancel(ctx)
peerRoutines[update.NodeID] = pcancel
r.ids.ReserveForPeer(update.NodeID)
s.Spawn(func() error {
r.broadcastTxRoutine(pctx, update.NodeID)
return nil
})
// We keep peer management even when broadcasting is disabled,
// so that failedCheckTxCounts WAI.
if r.cfg.Broadcast {
Comment thread
pompon0 marked this conversation as resolved.
s.Spawn(func() error {
r.broadcastTxRoutine(pctx, update.NodeID)
return nil
})
}

case p2p.PeerStatusDown:
r.ids.Reclaim(update.NodeID)
Expand Down
114 changes: 55 additions & 59 deletions sei-tendermint/internal/mempool/reactor/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,41 +84,34 @@ func convertTex(in []testTx) types.Txs {
}

func setupReactors(ctx context.Context, t *testing.T, numNodes int) *reactorTestSuite {
return setupReactorsWithTxConstraintsFetchers(ctx, t, numNodes, nil)
return setupReactorsWithConfig(ctx, t, numNodes, config.TestMempoolConfig(), mempool.NopTxConstraintsFetcher)
}

func setupReactorsWithTxConstraintsFetchers(
func setupReactorsWithConfig(
ctx context.Context,
t *testing.T,
numNodes int,
txConstraintsFetchers map[int]mempool.TxConstraintsFetcher,
cfg *config.MempoolConfig,
txConstraintsFetcher mempool.TxConstraintsFetcher,
) *reactorTestSuite {
t.Helper()

cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|"))
require.NoError(t, err)
t.Cleanup(func() { os.RemoveAll(cfg.RootDir) })

rts := &reactorTestSuite{
network: p2p.MakeTestNetwork(t, p2p.TestNetworkOptions{NumNodes: numNodes}),
reactors: make(map[types.NodeID]*Reactor, numNodes),
mempools: make(map[types.NodeID]*mempool.TxMempool, numNodes),
kvstores: make(map[types.NodeID]*kvstore.Application, numNodes),
}

for i, node := range rts.network.Nodes() {
for _, node := range rts.network.Nodes() {
nodeID := node.NodeID
rts.kvstores[nodeID] = kvstore.NewApplication()

app := rts.kvstores[nodeID]
txConstraintsFetcher := mempool.NopTxConstraintsFetcher
if customFetcher, ok := txConstraintsFetchers[i]; ok {
txConstraintsFetcher = customFetcher
}
txmp := setupMempool(t, app, 0, txConstraintsFetcher)
rts.mempools[nodeID] = txmp

reactor, err := NewReactor(config.TestMempoolConfig(), txmp, node.Router)
reactor, err := NewReactor(cfg, txmp, node.Router)
if err != nil {
t.Fatalf("NewReactor(): %v", err)
}
Expand Down Expand Up @@ -222,58 +215,61 @@ func TestReactorBroadcastTxs(t *testing.T) {
}

func TestReactorFailedCheckTxCountEvictsPeer(t *testing.T) {
ctx := t.Context()

rts := setupReactorsWithTxConstraintsFetchers(ctx, t, 2, map[int]mempool.TxConstraintsFetcher{
1: func() (mempool.TxConstraints, error) {
return mempool.TxConstraints{
MaxDataBytes: 10,
MaxGas: -1,
}, nil
},
})
t.Cleanup(leaktest.Check(t))

sender := rts.nodes[0]
receiver := rts.nodes[1]

receiverReactor := rts.reactors[receiver]
receiverReactor.cfg.CheckTxErrorBlacklistEnabled = true
receiverReactor.cfg.CheckTxErrorThreshold = 2

rts.start(t)
conn := rts.network.Node(receiver).WaitForConnAndGet(ctx, sender)

msgForTx := func(tx []byte) p2p.RecvMsg[*pb.Message] {
return p2p.RecvMsg[*pb.Message]{
From: sender,
Message: &pb.Message{
Sum: &pb.Message_Txs{
Txs: &pb.Txs{Txs: [][]byte{tx}},
},
},
}
}
for _, broadcast := range []bool{true, false} {
t.Run(fmt.Sprintf("broadcast=%v", broadcast), func(t *testing.T) {
ctx := t.Context()

cfg := config.TestMempoolConfig()
cfg.Broadcast = broadcast
cfg.CheckTxErrorBlacklistEnabled = true
cfg.CheckTxErrorThreshold = 2

rts := setupReactorsWithConfig(ctx, t, 2, cfg, func() (mempool.TxConstraints, error) {
return mempool.TxConstraints{
MaxDataBytes: 10,
MaxGas: -1,
}, nil
})
t.Cleanup(leaktest.Check(t))

sender := rts.nodes[0]
receiver := rts.nodes[1]
receiverReactor := rts.reactors[receiver]
rts.start(t)
conn := rts.network.Node(receiver).WaitForConnAndGet(ctx, sender)

msgForTx := func(tx []byte) p2p.RecvMsg[*pb.Message] {
return p2p.RecvMsg[*pb.Message]{
From: sender,
Message: &pb.Message{
Sum: &pb.Message_Txs{
Txs: &pb.Txs{Txs: [][]byte{tx}},
},
},
}
}

require.Eventually(t, func() bool {
return peerFailedCheckTxCount(receiverReactor, sender) == utils.Some(0)
}, time.Second, 50*time.Millisecond)
require.Eventually(t, func() bool {
return peerFailedCheckTxCount(receiverReactor, sender) == utils.Some(0)
}, time.Second, 50*time.Millisecond)

require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx([]byte("good-1"))))
require.Equal(t, utils.Some(0), peerFailedCheckTxCount(receiverReactor, sender))
require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx([]byte("good-1"))))
require.Equal(t, utils.Some(0), peerFailedCheckTxCount(receiverReactor, sender))

badTx := []byte("bad-transaction")
require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx)))
require.Equal(t, utils.Some(1), peerFailedCheckTxCount(receiverReactor, sender))
badTx := []byte("bad-transaction")
require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx)))
require.Equal(t, utils.Some(1), peerFailedCheckTxCount(receiverReactor, sender))

require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx([]byte("good-2"))))
require.Equal(t, utils.Some(1), peerFailedCheckTxCount(receiverReactor, sender))
require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx([]byte("good-2"))))
require.Equal(t, utils.Some(1), peerFailedCheckTxCount(receiverReactor, sender))

require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx)))
require.Equal(t, utils.Some(2), peerFailedCheckTxCount(receiverReactor, sender))
require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx)))
require.Equal(t, utils.Some(2), peerFailedCheckTxCount(receiverReactor, sender))

require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx)))
rts.network.Node(receiver).WaitForDisconnect(ctx, conn)
require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx)))
rts.network.Node(receiver).WaitForDisconnect(ctx, conn)
})
}
}

func TestReactorPeerDownClearsFailedCheckTxCount(t *testing.T) {
Expand Down
Loading
Loading