From 2e3164c2b3d63f7824195aec7b2e4fb7a8987a50 Mon Sep 17 00:00:00 2001 From: 0xbigapple Date: Fri, 17 Apr 2026 11:41:14 +0800 Subject: [PATCH] fix(net): fix RejectedExecutionException during shutdown trxHandlePoo --- .../TransactionsMsgHandler.java | 27 ++- .../TransactionsMsgHandlerTest.java | 159 ++++++++++++++++++ 2 files changed, 183 insertions(+), 3 deletions(-) diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index 0436b48d374..961646de4ee 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -3,6 +3,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.Getter; @@ -44,6 +45,7 @@ public class TransactionsMsgHandler implements TronMsgHandler { private BlockingQueue queue = new LinkedBlockingQueue(); + private volatile boolean isClosed = false; private int threadNum = Args.getInstance().getValidateSignThreadNum(); private final String trxEsName = "trx-msg-handler"; private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor( @@ -58,8 +60,14 @@ public void init() { } public void close() { - ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); + isClosed = true; + // Stop the scheduler first so no new tasks are drained from smartContractQueue. ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName); + // Then shutdown the worker pool to finish already-submitted tasks. + ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); + // Discard any remaining items and release references. + smartContractQueue.clear(); + queue.clear(); } public boolean isBusy() { @@ -68,6 +76,10 @@ public boolean isBusy() { @Override public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException { + if (isClosed) { + logger.warn("TransactionsMsgHandler is closed, drop message"); + return; + } TransactionsMessage transactionsMessage = (TransactionsMessage) msg; check(peer, transactionsMessage); for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { @@ -78,6 +90,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep int trxHandlePoolQueueSize = 0; int dropSmartContractCount = 0; for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { + if (isClosed) { + logger.warn("TransactionsMsgHandler is closed during processing, stop submit"); + break; + } int type = trx.getRawData().getContract(0).getType().getNumber(); if (type == ContractType.TriggerSmartContract_VALUE || type == ContractType.CreateSmartContract_VALUE) { @@ -87,8 +103,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep dropSmartContractCount++; } } else { - ExecutorServiceManager.submit( - trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); + try { + ExecutorServiceManager.submit( + trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); + } catch (RejectedExecutionException e) { + logger.warn("Submit task to {} failed", trxEsName); + break; + } } } diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java index db8aac00c60..e376516b945 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java @@ -3,12 +3,15 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import lombok.Getter; import org.joda.time.DateTime; @@ -20,7 +23,10 @@ import org.tron.common.TestConstants; import org.tron.common.runtime.TvmTestUtils; import org.tron.common.utils.ByteArray; +import org.tron.core.ChainBaseManager; import org.tron.core.config.args.Args; +import org.tron.core.exception.P2pException; +import org.tron.core.exception.P2pException.TypeEnum; import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.adv.TransactionMessage; import org.tron.core.net.message.adv.TransactionsMessage; @@ -132,6 +138,159 @@ public void testProcessMessage() { } } + @Test + public void testProcessMessageAfterClose() throws Exception { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + handler.init(); + handler.close(); + + PeerConnection peer = Mockito.mock(PeerConnection.class); + TransactionsMessage msg = Mockito.mock(TransactionsMessage.class); + + handler.processMessage(peer, msg); + + Mockito.verify(msg, Mockito.never()).getTransactions(); + Mockito.verifyNoInteractions(peer); + } + + @Test + public void testRejectedExecution() throws Exception { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + try { + ExecutorService mockPool = Mockito.mock(ExecutorService.class); + Mockito.when(mockPool.submit(Mockito.any(Runnable.class))) + .thenThrow(new RejectedExecutionException("pool closed")); + Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool"); + poolField.setAccessible(true); + poolField.set(handler, mockPool); + + PeerConnection peer = Mockito.mock(PeerConnection.class); + TransactionsMessage msg = buildTransferMessage(2); + stubAdvInvRequest(peer, msg); + // 2 transfer transactions, submit throws on the first → catch + break, only called once + handler.processMessage(peer, msg); + + Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class)); + } finally { + handler.close(); + } + } + + @Test + public void testCloseDuringProcessing() throws Exception { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + try { + Field closedField = TransactionsMsgHandler.class.getDeclaredField("isClosed"); + closedField.setAccessible(true); + + ExecutorService mockPool = Mockito.mock(ExecutorService.class); + // on the first submit, flip isClosed to true so the second iteration breaks + Mockito.when(mockPool.submit(Mockito.any(Runnable.class))).thenAnswer(inv -> { + closedField.set(handler, true); + return null; + }); + Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool"); + poolField.setAccessible(true); + poolField.set(handler, mockPool); + + PeerConnection peer = Mockito.mock(PeerConnection.class); + TransactionsMessage msg = buildTransferMessage(2); + stubAdvInvRequest(peer, msg); + handler.processMessage(peer, msg); + + Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class)); + } finally { + handler.close(); + } + } + + private TransactionsMessage buildTransferMessage(int count) { + List txs = new ArrayList<>(); + for (int i = 0; i < count; i++) { + BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder() + .setAmount(10 + i) + .setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf"))) + .setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf"))) + .build(); + txs.add(Protocol.Transaction.newBuilder().setRawData( + Protocol.Transaction.raw.newBuilder() + .setTimestamp(1_700_000_000_000L + i) + .setRefBlockNum(1) + .addContract(Protocol.Transaction.Contract.newBuilder() + .setType(Protocol.Transaction.Contract.ContractType.TransferContract) + .setParameter(Any.pack(tc)).build()).build()) + .build()); + } + return new TransactionsMessage(txs); + } + + private void stubAdvInvRequest(PeerConnection peer, TransactionsMessage msg) { + Map advInvRequest = new ConcurrentHashMap<>(); + for (Protocol.Transaction trx : msg.getTransactions().getTransactionsList()) { + Item item = new Item(new TransactionMessage(trx).getMessageId(), + Protocol.Inventory.InventoryType.TRX); + advInvRequest.put(item, 0L); + } + Mockito.when(peer.getAdvInvRequest()).thenReturn(advInvRequest); + } + + @Test + public void testHandleTransaction() throws Exception { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + try { + TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class); + AdvService advService = Mockito.mock(AdvService.class); + ChainBaseManager chainBaseManager = Mockito.mock(ChainBaseManager.class); + + Field f1 = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate"); + f1.setAccessible(true); + f1.set(handler, tronNetDelegate); + Field f2 = TransactionsMsgHandler.class.getDeclaredField("advService"); + f2.setAccessible(true); + f2.set(handler, advService); + Field f3 = TransactionsMsgHandler.class.getDeclaredField("chainBaseManager"); + f3.setAccessible(true); + f3.set(handler, chainBaseManager); + + PeerConnection peer = Mockito.mock(PeerConnection.class); + + BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder() + .setAmount(10) + .setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf"))) + .setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf"))) + .build(); + long now = System.currentTimeMillis(); + Protocol.Transaction trx = Protocol.Transaction.newBuilder().setRawData( + Protocol.Transaction.raw.newBuilder() + .setTimestamp(now) + .setExpiration(now + 60_000) + .setRefBlockNum(1) + .addContract(Protocol.Transaction.Contract.newBuilder() + .setType(Protocol.Transaction.Contract.ContractType.TransferContract) + .setParameter(Any.pack(tc)).build()).build()) + .build(); + TransactionMessage trxMsg = new TransactionMessage(trx); + + Method handleTx = TransactionsMsgHandler.class.getDeclaredMethod( + "handleTransaction", PeerConnection.class, TransactionMessage.class); + handleTx.setAccessible(true); + + // happy path → push and broadcast + Mockito.when(chainBaseManager.getNextBlockSlotTime()).thenReturn(now); + handleTx.invoke(handler, peer, trxMsg); + Mockito.verify(advService).broadcast(trxMsg); + + // P2pException BAD_TRX → disconnect + Mockito.doThrow(new P2pException(TypeEnum.BAD_TRX, "bad")) + .when(tronNetDelegate).pushTransaction(Mockito.any()); + handleTx.invoke(handler, peer, trxMsg); + Mockito.verify(peer).setBadPeer(true); + Mockito.verify(peer).disconnect(Protocol.ReasonCode.BAD_TX); + } finally { + handler.close(); + } + } + class TrxEvent { @Getter