From 0d50de2c44d771445a7d80f4c9161995996bb67e Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Tue, 21 Apr 2026 14:21:57 +0200 Subject: [PATCH 1/2] applied comments --- .../internal/autobahn/producer/state.go | 28 +- sei-tendermint/internal/mempool/mempool.go | 28 +- .../internal/mempool/reactor/reactor.go | 15 +- sei-tendermint/internal/mempool/tx.go | 343 ++++++++++-------- sei-tendermint/internal/mempool/tx_test.go | 46 ++- 5 files changed, 240 insertions(+), 220 deletions(-) diff --git a/sei-tendermint/internal/autobahn/producer/state.go b/sei-tendermint/internal/autobahn/producer/state.go index 649c079bd8..caf21b5416 100644 --- a/sei-tendermint/internal/autobahn/producer/state.go +++ b/sei-tendermint/internal/autobahn/producer/state.go @@ -49,16 +49,16 @@ func NewState(cfg *Config, txMempool *mempool.TxMempool, consensus *consensus.St } // makePayload constructs payload for the next produced block. -// It waits for enough transactions OR until `cfg.BlockInterval` passes. -func (s *State) makePayload(ctx context.Context) *types.Payload { - ctx, cancel := context.WithTimeout(ctx, s.cfg.BlockInterval) - defer cancel() - - if s.txMempool.NumTxsNotPending() == 0 { - select { - case <-ctx.Done(): - case <-s.txMempool.TxsAvailable(): - } +// It waits for any transactions OR until `cfg.BlockInterval` passes. +func (s *State) makePayload(ctx context.Context) (*types.Payload, error) { + // Wait for transactions. We give up and produce an empty block if mempool is empty for + // cfg.BlockInterval. + _ = utils.WithTimeout(ctx, s.cfg.BlockInterval, func(ctx context.Context) error { + return s.txMempool.TxStore().WaitForTxs(ctx) + }) + // If the context has been cancelled though, we just fail. + if err := ctx.Err(); err != nil { + return nil, err } txs, gasEstimated := s.txMempool.PopTxs(mempool.ReapLimits{ @@ -83,16 +83,16 @@ func (s *State) makePayload(ctx context.Context) *types.Payload { if err != nil { panic(fmt.Errorf("PayloadBuilder{}.Build(): %w", err)) } - return payload + return payload, nil } // nextPayload constructs the payload for the next block. // Wrapper of makePayload which ensures that the block is not empty (if required). func (s *State) nextPayload(ctx context.Context) (*types.Payload, error) { for { - payload := s.makePayload(ctx) - if ctx.Err() != nil { - return nil, ctx.Err() + payload, err := s.makePayload(ctx) + if err != nil { + return nil, err } if len(payload.Txs()) > 0 || s.cfg.AllowEmptyBlocks { return payload, nil diff --git a/sei-tendermint/internal/mempool/mempool.go b/sei-tendermint/internal/mempool/mempool.go index 20416b2c15..72ac844621 100644 --- a/sei-tendermint/internal/mempool/mempool.go +++ b/sei-tendermint/internal/mempool/mempool.go @@ -175,12 +175,6 @@ type TxMempool struct { // height defines the last block height process during Update() height int64 - // sizeBytes defines the total size of the mempool (sum of all tx bytes) - sizeBytes int64 - - // pendingSizeBytes defines the total size of the pending set (sum of all tx bytes) - pendingSizeBytes int64 - // cache defines a fixed-size cache of already seen transactions as this // reduces pressure on the proxyApp. cache TxCache @@ -302,17 +296,11 @@ func (txmp *TxMempool) NumTxsNotPending() int { } func (txmp *TxMempool) BytesNotPending() int64 { - txmp.txStore.mtx.RLock() - defer txmp.txStore.mtx.RUnlock() - totalBytes := int64(0) - for _, wrappedTx := range txmp.txStore.hashTxs { - totalBytes += int64(len(wrappedTx.tx)) - } - return totalBytes + return txmp.txStore.AllTxsBytes() } func (txmp *TxMempool) TotalTxsBytesSize() int64 { - return txmp.BytesNotPending() + int64(txmp.pendingTxs.SizeBytes()) //nolint:gosec // mempool size is bounded by configured limits; no overflow risk + return txmp.BytesNotPending() + txmp.pendingTxs.SizeBytes() } // PendingSize returns the number of pending transactions in the mempool. @@ -320,9 +308,9 @@ func (txmp *TxMempool) PendingSize() int { return txmp.pendingTxs.Size() } // SizeBytes return the total sum in bytes of all the valid transactions in the // mempool. It is thread-safe. -func (txmp *TxMempool) SizeBytes() int64 { return atomic.LoadInt64(&txmp.sizeBytes) } +func (txmp *TxMempool) SizeBytes() int64 { return txmp.txStore.AllTxsBytes() } -func (txmp *TxMempool) PendingSizeBytes() int64 { return atomic.LoadInt64(&txmp.pendingSizeBytes) } +func (txmp *TxMempool) PendingSizeBytes() int64 { return txmp.pendingTxs.SizeBytes() } // WaitForNextTx waits until the next transaction is available for gossip. // Returns the next valid transaction to gossip. @@ -507,7 +495,6 @@ func (txmp *TxMempool) CheckTx( removeHandler(true) return err } - atomic.AddInt64(&txmp.pendingSizeBytes, int64(wtx.Size())) if err := txmp.pendingTxs.Insert(wtx, res, txInfo); err != nil { return err } @@ -594,7 +581,6 @@ func (txmp *TxMempool) Flush() { txmp.removeTx(wtx, false, false, true) } - atomic.SwapInt64(&txmp.sizeBytes, 0) txmp.cache.Reset() } @@ -1144,7 +1130,6 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool { wtx.gossipEl = gossipEl txmp.metrics.InsertedTxs.Add(1) - atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size())) return true } @@ -1166,8 +1151,6 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReen wtx.gossipEl.DetachPrev() txmp.metrics.RemovedTxs.Add(1) - atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) - wtx.removeHandler(removeFromCache) if shouldReenqueue { @@ -1241,7 +1224,6 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { // remove pending txs that have expired txmp.pendingTxs.PurgeExpired(blockHeight, now, func(wtx *WrappedTx) { - atomic.AddInt64(&txmp.pendingSizeBytes, int64(-wtx.Size())) txmp.expire(blockHeight, wtx) }) } @@ -1274,13 +1256,11 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string func (txmp *TxMempool) handlePendingTransactions() { accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions() for _, tx := range accepted { - atomic.AddInt64(&txmp.pendingSizeBytes, int64(-tx.tx.Size())) if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil { logger.Error("error adding pending transaction", "err", err) } } for _, tx := range rejected { - atomic.AddInt64(&txmp.pendingSizeBytes, int64(-tx.tx.Size())) if !txmp.config.KeepInvalidTxsInCache { tx.tx.removeHandler(true) } diff --git a/sei-tendermint/internal/mempool/reactor/reactor.go b/sei-tendermint/internal/mempool/reactor/reactor.go index bdc8dc6a88..9f2e314f3a 100644 --- a/sei-tendermint/internal/mempool/reactor/reactor.go +++ b/sei-tendermint/internal/mempool/reactor/reactor.go @@ -201,9 +201,6 @@ func (r *Reactor) processMempoolCh(ctx context.Context) error { // handle PeerUpdate messages. When the reactor is stopped, we will catch the // signal and close the p2p PeerUpdatesCh gracefully. func (r *Reactor) processPeerUpdates(ctx context.Context) error { - if !r.cfg.Broadcast { - return nil - } return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { recv := r.router.Subscribe() peerRoutines := map[types.NodeID]context.CancelFunc{} @@ -222,10 +219,14 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) error { pctx, pcancel := context.WithCancel(ctx) peerRoutines[update.NodeID] = pcancel r.ids.ReserveForPeer(update.NodeID) - s.Spawn(func() error { - r.broadcastTxRoutine(pctx, update.NodeID) - return nil - }) + // We keep peer management even then broadcasting is disabled, + // so that failedCheckTxCounts WAI. + if r.cfg.Broadcast { + s.Spawn(func() error { + r.broadcastTxRoutine(pctx, update.NodeID) + return nil + }) + } case p2p.PeerStatusDown: r.ids.Reclaim(update.NodeID) diff --git a/sei-tendermint/internal/mempool/tx.go b/sei-tendermint/internal/mempool/tx.go index 3b632e41b4..de64040b71 100644 --- a/sei-tendermint/internal/mempool/tx.go +++ b/sei-tendermint/internal/mempool/tx.go @@ -1,8 +1,9 @@ package mempool import ( + "context" "errors" - "sync" + "sync/atomic" "time" abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types" @@ -92,6 +93,12 @@ func (wtx *WrappedTx) Size() int { return len(wtx.tx) } +type txStoreInner struct { + hashTxs map[types.TxKey]*WrappedTx // primary index + senderTxs map[string]*WrappedTx // sender is defined by the ABCI application + sizeBytes utils.AtomicSend[int64] +} + // TxStore implements a thread-safe mapping of valid transaction(s). // // NOTE: @@ -99,75 +106,86 @@ func (wtx *WrappedTx) Size() int { // access is not allowed. Regardless, it is not expected for the mempool to // need mutative access. type TxStore struct { - mtx sync.RWMutex - hashTxs map[types.TxKey]*WrappedTx // primary index - senderTxs map[string]*WrappedTx // sender is defined by the ABCI application + inner utils.RWMutex[*txStoreInner] + sizeBytes utils.AtomicRecv[int64] } func NewTxStore() *TxStore { - return &TxStore{ + inner := &txStoreInner{ senderTxs: make(map[string]*WrappedTx), hashTxs: make(map[types.TxKey]*WrappedTx), + sizeBytes: utils.NewAtomicSend[int64](0), + } + return &TxStore{ + inner: utils.NewRWMutex(inner), + sizeBytes: inner.sizeBytes.Subscribe(), } } // Size returns the total number of transactions in the store. func (txs *TxStore) Size() int { - txs.mtx.RLock() - defer txs.mtx.RUnlock() + for inner := range txs.inner.RLock() { + return len(inner.hashTxs) + } + panic("unreachable") +} - return len(txs.hashTxs) +// AllTxsBytes returns the total size in bytes of all transactions in the store. +func (txs *TxStore) AllTxsBytes() int64 { + return txs.sizeBytes.Load() +} + +// WaitForTxs waits until the store becomes non-empty. +func (txs *TxStore) WaitForTxs(ctx context.Context) error { + _, err := txs.sizeBytes.Wait(ctx, func(sizeBytes int64) bool { return sizeBytes > 0 }) + return err } // GetAllTxs returns all the transactions currently in the store. func (txs *TxStore) GetAllTxs() []*WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wTxs := make([]*WrappedTx, len(txs.hashTxs)) - i := 0 - for _, wtx := range txs.hashTxs { - wTxs[i] = wtx - i++ + for inner := range txs.inner.RLock() { + wTxs := make([]*WrappedTx, len(inner.hashTxs)) + i := 0 + for _, wtx := range inner.hashTxs { + wTxs[i] = wtx + i++ + } + return wTxs } - - return wTxs + panic("unreachable") } // GetTxBySender returns a *WrappedTx by the transaction's sender property // defined by the ABCI application. func (txs *TxStore) GetTxBySender(sender string) *WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return txs.senderTxs[sender] + for inner := range txs.inner.RLock() { + return inner.senderTxs[sender] + } + panic("unreachable") } // GetTxByHash returns a *WrappedTx by the transaction's hash. func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return txs.hashTxs[hash] + for inner := range txs.inner.RLock() { + return inner.hashTxs[hash] + } + panic("unreachable") } // IsTxRemoved returns true if a transaction by hash is marked as removed and // false otherwise. func (txs *TxStore) IsTxRemoved(wtx *WrappedTx) bool { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - // if this instance has already been marked, return true - if wtx.removed { - return true - } - - // otherwise if the same hash exists, return its state - wtx, ok := txs.hashTxs[wtx.hash] - if ok { - return wtx.removed + for inner := range txs.inner.RLock() { + // if this instance has already been marked, return true + if wtx.removed { + return true + } + // otherwise if the same hash exists, return its state + wtx, ok := inner.hashTxs[wtx.hash] + if ok { + return wtx.removed + } } - // otherwise we haven't seen this tx return false } @@ -176,43 +194,45 @@ func (txs *TxStore) IsTxRemoved(wtx *WrappedTx) bool { // non-empty sender, we additionally store the transaction by the sender as // defined by the ABCI application. func (txs *TxStore) SetTx(wtx *WrappedTx) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - if len(wtx.sender) > 0 { - txs.senderTxs[wtx.sender] = wtx + for inner := range txs.inner.Lock() { + existing := inner.hashTxs[wtx.tx.Key()] + if len(wtx.sender) > 0 { + inner.senderTxs[wtx.sender] = wtx + } + inner.hashTxs[wtx.tx.Key()] = wtx + if existing == nil { + inner.sizeBytes.Store(inner.sizeBytes.Load() + int64(wtx.Size())) + } } - - txs.hashTxs[wtx.tx.Key()] = wtx } // RemoveTx removes a *WrappedTx from the transaction store. It deletes all // indexes of the transaction. func (txs *TxStore) RemoveTx(wtx *WrappedTx) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - if len(wtx.sender) > 0 { - delete(txs.senderTxs, wtx.sender) + for inner := range txs.inner.Lock() { + if len(wtx.sender) > 0 { + delete(inner.senderTxs, wtx.sender) + } + if _, ok := inner.hashTxs[wtx.tx.Key()]; ok { + delete(inner.hashTxs, wtx.tx.Key()) + inner.sizeBytes.Store(inner.sizeBytes.Load() - int64(wtx.Size())) + } + wtx.removed = true } - - delete(txs.hashTxs, wtx.tx.Key()) - wtx.removed = true } // TxHasPeer returns true if a transaction by hash has a given peer ID and false // otherwise. If the transaction does not exist, false is returned. func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wtx := txs.hashTxs[hash] - if wtx == nil { - return false + for inner := range txs.inner.RLock() { + wtx := inner.hashTxs[hash] + if wtx == nil { + return false + } + _, ok := wtx.peers[peerID] + return ok } - - _, ok := wtx.peers[peerID] - return ok + panic("unreachable") } // GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the @@ -221,24 +241,24 @@ func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool { // and false otherwise. If the transaction does not exist by hash, we return // (nil, false). func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool) { - txs.mtx.Lock() - defer txs.mtx.Unlock() + for inner := range txs.inner.Lock() { + wtx := inner.hashTxs[hash] + if wtx == nil { + return nil, false + } - wtx := txs.hashTxs[hash] - if wtx == nil { - return nil, false - } + if wtx.peers == nil { + wtx.peers = make(map[uint16]struct{}) + } - if wtx.peers == nil { - wtx.peers = make(map[uint16]struct{}) - } + if _, ok := wtx.peers[peerID]; ok { + return wtx, true + } - if _, ok := wtx.peers[peerID]; ok { - return wtx, true + wtx.peers[peerID] = struct{}{} + return wtx, false } - - wtx.peers[peerID] = struct{}{} - return wtx, false + panic("unreachable") } // WrappedTxList orders transactions in the order that they arrived. @@ -309,10 +329,13 @@ func (wtl *WrappedTxList) Purge(minTime utils.Option[time.Time], minHeight utils } type PendingTxs struct { - mtx *sync.RWMutex - txs []TxWithResponse + inner utils.RWMutex[*pendingTxsInner] config *Config - sizeBytes uint64 + sizeBytes atomic.Int64 +} + +type pendingTxsInner struct { + txs []TxWithResponse } type TxWithResponse struct { @@ -323,10 +346,10 @@ type TxWithResponse struct { func NewPendingTxs(conf *Config) *PendingTxs { return &PendingTxs{ - mtx: &sync.RWMutex{}, - txs: []TxWithResponse{}, - config: conf, - sizeBytes: 0, + inner: utils.NewRWMutex(&pendingTxsInner{ + txs: []TxWithResponse{}, + }), + config: conf, } } func (p *PendingTxs) EvaluatePendingTransactions() ( @@ -334,123 +357,121 @@ func (p *PendingTxs) EvaluatePendingTransactions() ( rejectedTxs []TxWithResponse, ) { poppedIndices := []int{} - p.mtx.Lock() - defer p.mtx.Unlock() - for i := 0; i < len(p.txs); i++ { - switch p.txs[i].checkTxResponse.Checker() { - case abci.Accepted: - acceptedTxs = append(acceptedTxs, p.txs[i]) - poppedIndices = append(poppedIndices, i) - case abci.Rejected: - rejectedTxs = append(rejectedTxs, p.txs[i]) - poppedIndices = append(poppedIndices, i) + for inner := range p.inner.Lock() { + for i := 0; i < len(inner.txs); i++ { + switch inner.txs[i].checkTxResponse.Checker() { + case abci.Accepted: + acceptedTxs = append(acceptedTxs, inner.txs[i]) + poppedIndices = append(poppedIndices, i) + case abci.Rejected: + rejectedTxs = append(rejectedTxs, inner.txs[i]) + poppedIndices = append(poppedIndices, i) + } } + p.popTxsAtIndices(inner, poppedIndices) + return } - p.popTxsAtIndices(poppedIndices) - return + panic("unreachable") } -// assume mtx is already acquired -func (p *PendingTxs) popTxsAtIndices(indices []int) { +// Assumes the pending tx store is already write-locked. +func (p *PendingTxs) popTxsAtIndices(inner *pendingTxsInner, indices []int) { if len(indices) == 0 { return } - newTxs := make([]TxWithResponse, 0, max(0, len(p.txs)-len(indices))) + newTxs := make([]TxWithResponse, 0, max(0, len(inner.txs)-len(indices))) start := 0 for _, idx := range indices { if idx <= start-1 { panic("indices popped from pending tx store should be sorted without duplicate") } - if idx >= len(p.txs) { + if idx >= len(inner.txs) { panic("indices popped from pending tx store out of range") } - p.sizeBytes -= uint64(p.txs[idx].tx.Size()) //nolint:gosec // Size() is non-negative - newTxs = append(newTxs, p.txs[start:idx]...) + p.sizeBytes.Add(int64(-inner.txs[idx].tx.Size())) + newTxs = append(newTxs, inner.txs[start:idx]...) start = idx + 1 } - newTxs = append(newTxs, p.txs[start:]...) - p.txs = newTxs + newTxs = append(newTxs, inner.txs[start:]...) + inner.txs = newTxs } func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) error { - p.mtx.Lock() - defer p.mtx.Unlock() + for inner := range p.inner.Lock() { + if len(inner.txs) >= p.config.PendingSize || int64(tx.Size())+p.sizeBytes.Load() > p.config.MaxPendingTxsBytes { + return errors.New("pending store is full") + } - if len(p.txs) >= p.config.PendingSize || uint64(tx.Size())+p.sizeBytes > uint64(p.config.MaxPendingTxsBytes) { //nolint:gosec // Size() and MaxPendingTxsBytes are non-negative validated values - return errors.New("pending store is full") + inner.txs = append(inner.txs, TxWithResponse{ + tx: tx, + checkTxResponse: resCheckTx, + txInfo: txInfo, + }) + p.sizeBytes.Add(int64(tx.Size())) + return nil } - - p.txs = append(p.txs, TxWithResponse{ - tx: tx, - checkTxResponse: resCheckTx, - txInfo: txInfo, - }) - p.sizeBytes += uint64(tx.Size()) //nolint:gosec // Size() is non-negative - return nil + panic("unreachable") } -func (p *PendingTxs) SizeBytes() uint64 { - p.mtx.RLock() - defer p.mtx.RUnlock() - return p.sizeBytes -} +func (p *PendingTxs) SizeBytes() int64 { return p.sizeBytes.Load() } func (p *PendingTxs) Peek(max int) []TxWithResponse { - p.mtx.RLock() - defer p.mtx.RUnlock() - // priority is fifo - if max > len(p.txs) { - return p.txs + for inner := range p.inner.RLock() { + // priority is fifo + if max > len(inner.txs) { + return inner.txs + } + return inner.txs[:max] } - return p.txs[:max] + panic("unreachable") } func (p *PendingTxs) Size() int { - p.mtx.RLock() - defer p.mtx.RUnlock() - return len(p.txs) + for inner := range p.inner.RLock() { + return len(inner.txs) + } + panic("unreachable") } func (p *PendingTxs) PurgeExpired(blockHeight int64, now time.Time, cb func(wtx *WrappedTx)) { - p.mtx.Lock() - defer p.mtx.Unlock() - - if len(p.txs) == 0 { - return - } + for inner := range p.inner.Lock() { + if len(inner.txs) == 0 { + return + } - // txs retains the ordering of insertion - if p.config.TTLNumBlocks > 0 { - idxFirstNotExpiredTx := len(p.txs) - for i, ptx := range p.txs { - // once found, we can break because these are ordered - if (blockHeight - ptx.tx.height) <= p.config.TTLNumBlocks { - idxFirstNotExpiredTx = i - break - } else { + // txs retains the ordering of insertion + if p.config.TTLNumBlocks > 0 { + idxFirstNotExpiredTx := len(inner.txs) + for i, ptx := range inner.txs { + // once found, we can break because these are ordered + if (blockHeight - ptx.tx.height) <= p.config.TTLNumBlocks { + idxFirstNotExpiredTx = i + break + } cb(ptx.tx) - p.sizeBytes -= uint64(ptx.tx.Size()) //nolint:gosec // Size() is non-negative + p.sizeBytes.Add(int64(-ptx.tx.Size())) } + inner.txs = inner.txs[idxFirstNotExpiredTx:] } - p.txs = p.txs[idxFirstNotExpiredTx:] - } - if len(p.txs) == 0 { - return - } + if len(inner.txs) == 0 { + return + } - if p.config.TTLDuration > 0 { - idxFirstNotExpiredTx := len(p.txs) - for i, ptx := range p.txs { - // once found, we can break because these are ordered - if now.Sub(ptx.tx.timestamp) <= p.config.TTLDuration { - idxFirstNotExpiredTx = i - break - } else { + if p.config.TTLDuration > 0 { + idxFirstNotExpiredTx := len(inner.txs) + for i, ptx := range inner.txs { + // once found, we can break because these are ordered + if now.Sub(ptx.tx.timestamp) <= p.config.TTLDuration { + idxFirstNotExpiredTx = i + break + } cb(ptx.tx) - p.sizeBytes -= uint64(ptx.tx.Size()) //nolint:gosec // Size() is non-negative + p.sizeBytes.Add(int64(-ptx.tx.Size())) } + inner.txs = inner.txs[idxFirstNotExpiredTx:] } - p.txs = p.txs[idxFirstNotExpiredTx:] + return } + panic("unreachable") } diff --git a/sei-tendermint/internal/mempool/tx_test.go b/sei-tendermint/internal/mempool/tx_test.go index 5ccc36b613..dba09dcacf 100644 --- a/sei-tendermint/internal/mempool/tx_test.go +++ b/sei-tendermint/internal/mempool/tx_test.go @@ -318,16 +318,20 @@ func TestPendingTxsPopTxsGood(t *testing.T) { expected: []int{0, 2, 4}, }, } { - pendingTxs.txs = []TxWithResponse{} - for i := 0; i < test.origLen; i++ { - pendingTxs.txs = append(pendingTxs.txs, TxWithResponse{ - tx: &WrappedTx{tx: []byte{}}, - txInfo: TxInfo{SenderID: uint16(i)}}) - } - pendingTxs.popTxsAtIndices(test.popIndices) - require.Equal(t, len(test.expected), len(pendingTxs.txs)) - for i, e := range test.expected { - require.Equal(t, e, int(pendingTxs.txs[i].txInfo.SenderID)) + for inner := range pendingTxs.inner.Lock() { + inner.txs = []TxWithResponse{} + pendingTxs.sizeBytes.Store(0) + for i := 0; i < test.origLen; i++ { + inner.txs = append(inner.txs, TxWithResponse{ + tx: &WrappedTx{tx: []byte{}}, + txInfo: TxInfo{SenderID: uint16(i)}, + }) + } + pendingTxs.popTxsAtIndices(inner, test.popIndices) + require.Equal(t, len(test.expected), len(inner.txs)) + for i, e := range test.expected { + require.Equal(t, e, int(inner.txs[i].txInfo.SenderID)) + } } } } @@ -335,12 +339,26 @@ func TestPendingTxsPopTxsGood(t *testing.T) { func TestPendingTxsPopTxsBad(t *testing.T) { pendingTxs := NewPendingTxs(DefaultConfig()) // out of range - require.Panics(t, func() { pendingTxs.popTxsAtIndices([]int{0}) }) + require.Panics(t, func() { + for inner := range pendingTxs.inner.Lock() { + pendingTxs.popTxsAtIndices(inner, []int{0}) + } + }) // out of order - pendingTxs.txs = []TxWithResponse{{}, {}, {}} - require.Panics(t, func() { pendingTxs.popTxsAtIndices([]int{1, 0}) }) + for inner := range pendingTxs.inner.Lock() { + inner.txs = []TxWithResponse{{}, {}, {}} + } + require.Panics(t, func() { + for inner := range pendingTxs.inner.Lock() { + pendingTxs.popTxsAtIndices(inner, []int{1, 0}) + } + }) // duplicate - require.Panics(t, func() { pendingTxs.popTxsAtIndices([]int{2, 2}) }) + require.Panics(t, func() { + for inner := range pendingTxs.inner.Lock() { + pendingTxs.popTxsAtIndices(inner, []int{2, 2}) + } + }) } func TestPendingTxs_InsertCondition(t *testing.T) { From 64c226caf79fd3a4582e2a86ba1278dfd43de4fd Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 23 Apr 2026 10:27:03 +0200 Subject: [PATCH 2/2] applied comments --- .../internal/mempool/reactor/reactor.go | 2 +- .../internal/mempool/reactor/reactor_test.go | 114 +++++++++--------- 2 files changed, 56 insertions(+), 60 deletions(-) diff --git a/sei-tendermint/internal/mempool/reactor/reactor.go b/sei-tendermint/internal/mempool/reactor/reactor.go index 2429ba1e23..cee2e008a2 100644 --- a/sei-tendermint/internal/mempool/reactor/reactor.go +++ b/sei-tendermint/internal/mempool/reactor/reactor.go @@ -220,7 +220,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) error { pctx, pcancel := context.WithCancel(ctx) peerRoutines[update.NodeID] = pcancel r.ids.ReserveForPeer(update.NodeID) - // We keep peer management even then broadcasting is disabled, + // We keep peer management even when broadcasting is disabled, // so that failedCheckTxCounts WAI. if r.cfg.Broadcast { s.Spawn(func() error { diff --git a/sei-tendermint/internal/mempool/reactor/reactor_test.go b/sei-tendermint/internal/mempool/reactor/reactor_test.go index aa35af5820..4380b472b9 100644 --- a/sei-tendermint/internal/mempool/reactor/reactor_test.go +++ b/sei-tendermint/internal/mempool/reactor/reactor_test.go @@ -84,21 +84,18 @@ func convertTex(in []testTx) types.Txs { } func setupReactors(ctx context.Context, t *testing.T, numNodes int) *reactorTestSuite { - return setupReactorsWithTxConstraintsFetchers(ctx, t, numNodes, nil) + return setupReactorsWithConfig(ctx, t, numNodes, config.TestMempoolConfig(), mempool.NopTxConstraintsFetcher) } -func setupReactorsWithTxConstraintsFetchers( +func setupReactorsWithConfig( ctx context.Context, t *testing.T, numNodes int, - txConstraintsFetchers map[int]mempool.TxConstraintsFetcher, + cfg *config.MempoolConfig, + txConstraintsFetcher mempool.TxConstraintsFetcher, ) *reactorTestSuite { t.Helper() - cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|")) - require.NoError(t, err) - t.Cleanup(func() { os.RemoveAll(cfg.RootDir) }) - rts := &reactorTestSuite{ network: p2p.MakeTestNetwork(t, p2p.TestNetworkOptions{NumNodes: numNodes}), reactors: make(map[types.NodeID]*Reactor, numNodes), @@ -106,19 +103,15 @@ func setupReactorsWithTxConstraintsFetchers( kvstores: make(map[types.NodeID]*kvstore.Application, numNodes), } - for i, node := range rts.network.Nodes() { + for _, node := range rts.network.Nodes() { nodeID := node.NodeID rts.kvstores[nodeID] = kvstore.NewApplication() app := rts.kvstores[nodeID] - txConstraintsFetcher := mempool.NopTxConstraintsFetcher - if customFetcher, ok := txConstraintsFetchers[i]; ok { - txConstraintsFetcher = customFetcher - } txmp := setupMempool(t, app, 0, txConstraintsFetcher) rts.mempools[nodeID] = txmp - reactor, err := NewReactor(config.TestMempoolConfig(), txmp, node.Router) + reactor, err := NewReactor(cfg, txmp, node.Router) if err != nil { t.Fatalf("NewReactor(): %v", err) } @@ -222,58 +215,61 @@ func TestReactorBroadcastTxs(t *testing.T) { } func TestReactorFailedCheckTxCountEvictsPeer(t *testing.T) { - ctx := t.Context() - - rts := setupReactorsWithTxConstraintsFetchers(ctx, t, 2, map[int]mempool.TxConstraintsFetcher{ - 1: func() (mempool.TxConstraints, error) { - return mempool.TxConstraints{ - MaxDataBytes: 10, - MaxGas: -1, - }, nil - }, - }) - t.Cleanup(leaktest.Check(t)) - - sender := rts.nodes[0] - receiver := rts.nodes[1] - - receiverReactor := rts.reactors[receiver] - receiverReactor.cfg.CheckTxErrorBlacklistEnabled = true - receiverReactor.cfg.CheckTxErrorThreshold = 2 - - rts.start(t) - conn := rts.network.Node(receiver).WaitForConnAndGet(ctx, sender) - - msgForTx := func(tx []byte) p2p.RecvMsg[*pb.Message] { - return p2p.RecvMsg[*pb.Message]{ - From: sender, - Message: &pb.Message{ - Sum: &pb.Message_Txs{ - Txs: &pb.Txs{Txs: [][]byte{tx}}, - }, - }, - } - } + for _, broadcast := range []bool{true, false} { + t.Run(fmt.Sprintf("broadcast=%v", broadcast), func(t *testing.T) { + ctx := t.Context() + + cfg := config.TestMempoolConfig() + cfg.Broadcast = broadcast + cfg.CheckTxErrorBlacklistEnabled = true + cfg.CheckTxErrorThreshold = 2 + + rts := setupReactorsWithConfig(ctx, t, 2, cfg, func() (mempool.TxConstraints, error) { + return mempool.TxConstraints{ + MaxDataBytes: 10, + MaxGas: -1, + }, nil + }) + t.Cleanup(leaktest.Check(t)) + + sender := rts.nodes[0] + receiver := rts.nodes[1] + receiverReactor := rts.reactors[receiver] + rts.start(t) + conn := rts.network.Node(receiver).WaitForConnAndGet(ctx, sender) + + msgForTx := func(tx []byte) p2p.RecvMsg[*pb.Message] { + return p2p.RecvMsg[*pb.Message]{ + From: sender, + Message: &pb.Message{ + Sum: &pb.Message_Txs{ + Txs: &pb.Txs{Txs: [][]byte{tx}}, + }, + }, + } + } - require.Eventually(t, func() bool { - return peerFailedCheckTxCount(receiverReactor, sender) == utils.Some(0) - }, time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { + return peerFailedCheckTxCount(receiverReactor, sender) == utils.Some(0) + }, time.Second, 50*time.Millisecond) - require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx([]byte("good-1")))) - require.Equal(t, utils.Some(0), peerFailedCheckTxCount(receiverReactor, sender)) + require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx([]byte("good-1")))) + require.Equal(t, utils.Some(0), peerFailedCheckTxCount(receiverReactor, sender)) - badTx := []byte("bad-transaction") - require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx))) - require.Equal(t, utils.Some(1), peerFailedCheckTxCount(receiverReactor, sender)) + badTx := []byte("bad-transaction") + require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx))) + require.Equal(t, utils.Some(1), peerFailedCheckTxCount(receiverReactor, sender)) - require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx([]byte("good-2")))) - require.Equal(t, utils.Some(1), peerFailedCheckTxCount(receiverReactor, sender)) + require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx([]byte("good-2")))) + require.Equal(t, utils.Some(1), peerFailedCheckTxCount(receiverReactor, sender)) - require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx))) - require.Equal(t, utils.Some(2), peerFailedCheckTxCount(receiverReactor, sender)) + require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx))) + require.Equal(t, utils.Some(2), peerFailedCheckTxCount(receiverReactor, sender)) - require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx))) - rts.network.Node(receiver).WaitForDisconnect(ctx, conn) + require.NoError(t, receiverReactor.handleMempoolMessage(ctx, msgForTx(badTx))) + rts.network.Node(receiver).WaitForDisconnect(ctx, conn) + }) + } } func TestReactorPeerDownClearsFailedCheckTxCount(t *testing.T) {