Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class TransactionsMsgHandler implements TronMsgHandler {

private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();

private volatile boolean isClosed = false;
private int threadNum = Args.getInstance().getValidateSignThreadNum();
private final String trxEsName = "trx-msg-handler";
private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor(
Expand All @@ -58,8 +60,14 @@ public void init() {
}

public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
isClosed = true;
// Stop the scheduler first so no new tasks are drained from smartContractQueue.
ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName);
// Then shutdown the worker pool to finish already-submitted tasks.
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
// Discard any remaining items and release references.
smartContractQueue.clear();
queue.clear();
Comment on lines +69 to +70
Copy link
Copy Markdown
Collaborator

@317787106 317787106 Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] These two lines clear() may be unnecessary. After shutdownAndAwaitTermination(trxHandlePool) returns, all tasks have completed, but the queue is the backing queue of the thread pool.

These two comments may be also unnecessary, it's so simple & clear.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review — one nuance though: smartContractQueue is not a backing queue. It's an independent LinkedBlockingQueue<TrxEvent>processMessage offers into it, and smartContractExecutor takes from it only when queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE=100. So under backpressure (when trxHandlePool's queue is saturated), the scheduler stops draining smartContractQueue and it can hold up to MAX_TRX_SIZE=50_000 pending TrxEvents at shutdown. Explicit cleanup of state that may contain data matches the convention elsewhere in java-tron, so I'd like to keep smartContractQueue.clear().

queue is the backing queue as you said — queue.clear() is technically redundant after shutdownAndAwaitTermination. I added it purely for symmetry with smartContractQueue.clear(). If you think the symmetry isn't worth the redundancy, happy to drop queue.clear() alone.

The latter two (// Then shutdown the worker pool ... and // Discard any remaining items ...) will remove.

}

public boolean isBusy() {
Expand All @@ -68,6 +76,10 @@ public boolean isBusy() {

@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
if (isClosed) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SHOULD]Based on your problem description, the exception should occur in the handleSmartContract() function. You don't need to modify the processMessage function much; this check can be removed.

Copy link
Copy Markdown
Collaborator Author

@0xbigapple 0xbigapple Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — the PR description's "Why" section only covers the handleSmartContract() REE and doesn't spell out the second race, that's my oversight. The other scenario is the processMessage() side race, which I described in the reply to @317787106 #6692 (comment), during shutdown, an already in-flight or concurrently delivered TransactionsMessage can still enter processMessage() before peer removal is fully observed, while trxHandlePool is already shutting down. That's the real REE window that with the isClosed checks as defense-in-depth.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a node shuts down, it closes the peer first, so peer packets are no longer processed. Other packets don't have this check logic. Even if this logic is triggered, no exception will be thrown, even if there's no specific check for it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your feedback! I think there is still a race here; shutdown is not hard-synchronized with the message-handling path.

logger.warn("TransactionsMsgHandler is closed, drop message");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[QUESTION] Both isClosed guards log at WARN level. During a normal node shutdown this will fire once per in-flight message, potentially producing a large burst of WARN entries in the log. Is there a reason to prefer WARN over DEBUG or INFO here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — I wasn't confident about the right level here. I checked the rest of the codebase and java-tron has two conventions that both have a claim on these lines:

  • "Drop" runtime events → WARN (AdvService.java:200, P2pEventHandlerImpl.java:158, InventoryMsgHandler.java:54/60/66, and the other two Drop lines in this same file at L117/L155).
  • Lifecycle close/shutdown normal path → INFO (TronNetService.java:126 "Net service closed successfully", HistoryEventService.java:59, ConsensusService.java:89/91, ExecutorServiceManager.java:67/84, AbstractService.java:36/41, BackupServer.java:107).

L80/L94 sit on the boundary. The existing WARN followed the local "Drop" convention in this file. But your actionability argument is fair — during shutdown ops can't act on these, whereas the other Drop WARNs are actual runtime anomalies worth investigating.

Happy to flip both to INFO to match Convention B (closing-window expected event) if you agree, or keep WARN to stay consistent with the other Drop lines in this file. DEBUG feels too quiet since a one-time record of "messages were dropped during shutdown" is still useful for post-mortem. What do you think?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — I considered that change L80, L94, and L109 to INFO is a better choice. During normal node shutdown these are expected events, so WARN is too noisy and risks drowning out real runtime issues. The other Drop WARNs in this file (runtime backpressure) should stay as-is.

return;
}
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
check(peer, transactionsMessage);
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
Expand All @@ -78,6 +90,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
int trxHandlePoolQueueSize = 0;
int dropSmartContractCount = 0;
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
if (isClosed) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SHOULD]Duplicate checks can be deleted.

logger.warn("TransactionsMsgHandler is closed during processing, stop submit");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] Some transactions are processed, but some are dropped. Do we need to log the unprocessed tx num?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think it's fine to leave as-is — the two break sites only fire during shutdown, so a numeric drop count there can't really drive any operational action. The existing simple warn lines are enough as "this path was exercised" signals.

The existing dropSmartContractCount at L100-104 is a different case — that's runtime backpressure (queue saturated), which is an actionable ops signal, so it's already counted.

break;
}
int type = trx.getRawData().getContract(0).getType().getNumber();
if (type == ContractType.TriggerSmartContract_VALUE
|| type == ContractType.CreateSmartContract_VALUE) {
Expand All @@ -87,8 +103,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
dropSmartContractCount++;
}
} else {
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
try {
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
} catch (RejectedExecutionException e) {
logger.warn("Submit task to {} failed", trxEsName);
break;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SHOULD] This exception of handleTransaction is not same as handleSmartContract(), namely the second lacks RejectedExecutionException . They should have the same Exceptions.

Copy link
Copy Markdown
Collaborator Author

@0xbigapple 0xbigapple Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. The two submit() sites are asymmetric on purpose:

  • processMessage() is invoked by the p2p dispatch path. During close(), an already in-flight or concurrently delivered message can still enter processMessage() before peer removal is fully observed, while the pool is already shutting down. That's the real REE window, so this path needs the catch.
  • handleSmartContract() runs inside smartContractExecutor. In close() we deliberately shut the scheduler down before trxHandlePool, so by the time the pool is closed the scheduler has already terminated — no race window. The existing catch (Exception e) already absorbs anything unexpected.

Adding an REE catch here would document a case that cannot happen given the shutdown ordering, and invite future readers to ask "when can this fire?". So, I'd like to keep handleSmartContract() as-is.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

import lombok.Getter;
import org.joda.time.DateTime;
Expand All @@ -20,7 +23,10 @@
import org.tron.common.TestConstants;
import org.tron.common.runtime.TvmTestUtils;
import org.tron.common.utils.ByteArray;
import org.tron.core.ChainBaseManager;
import org.tron.core.config.args.Args;
import org.tron.core.exception.P2pException;
import org.tron.core.exception.P2pException.TypeEnum;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.TransactionMessage;
import org.tron.core.net.message.adv.TransactionsMessage;
Expand Down Expand Up @@ -132,6 +138,159 @@ public void testProcessMessage() {
}
}

@Test
public void testProcessMessageAfterClose() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
handler.init();
handler.close();

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = Mockito.mock(TransactionsMessage.class);

handler.processMessage(peer, msg);

Mockito.verify(msg, Mockito.never()).getTransactions();
Mockito.verifyNoInteractions(peer);
}

@Test
public void testRejectedExecution() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
try {
ExecutorService mockPool = Mockito.mock(ExecutorService.class);
Mockito.when(mockPool.submit(Mockito.any(Runnable.class)))
.thenThrow(new RejectedExecutionException("pool closed"));
Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool");
poolField.setAccessible(true);
poolField.set(handler, mockPool);

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = buildTransferMessage(2);
stubAdvInvRequest(peer, msg);
// 2 transfer transactions, submit throws on the first → catch + break, only called once
handler.processMessage(peer, msg);

Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class));
} finally {
handler.close();
}
}

@Test
public void testCloseDuringProcessing() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
try {
Field closedField = TransactionsMsgHandler.class.getDeclaredField("isClosed");
closedField.setAccessible(true);

ExecutorService mockPool = Mockito.mock(ExecutorService.class);
// on the first submit, flip isClosed to true so the second iteration breaks
Mockito.when(mockPool.submit(Mockito.any(Runnable.class))).thenAnswer(inv -> {
closedField.set(handler, true);
return null;
});
Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool");
poolField.setAccessible(true);
poolField.set(handler, mockPool);

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = buildTransferMessage(2);
stubAdvInvRequest(peer, msg);
handler.processMessage(peer, msg);

Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class));
} finally {
handler.close();
}
}

private TransactionsMessage buildTransferMessage(int count) {
List<Protocol.Transaction> txs = new ArrayList<>();
for (int i = 0; i < count; i++) {
BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
.setAmount(10 + i)
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
.build();
txs.add(Protocol.Transaction.newBuilder().setRawData(
Protocol.Transaction.raw.newBuilder()
.setTimestamp(1_700_000_000_000L + i)
.setRefBlockNum(1)
.addContract(Protocol.Transaction.Contract.newBuilder()
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
.setParameter(Any.pack(tc)).build()).build())
.build());
}
return new TransactionsMessage(txs);
}

private void stubAdvInvRequest(PeerConnection peer, TransactionsMessage msg) {
Map<Item, Long> advInvRequest = new ConcurrentHashMap<>();
for (Protocol.Transaction trx : msg.getTransactions().getTransactionsList()) {
Item item = new Item(new TransactionMessage(trx).getMessageId(),
Protocol.Inventory.InventoryType.TRX);
advInvRequest.put(item, 0L);
}
Mockito.when(peer.getAdvInvRequest()).thenReturn(advInvRequest);
}

@Test
public void testHandleTransaction() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
try {
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
AdvService advService = Mockito.mock(AdvService.class);
ChainBaseManager chainBaseManager = Mockito.mock(ChainBaseManager.class);

Field f1 = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate");
f1.setAccessible(true);
f1.set(handler, tronNetDelegate);
Field f2 = TransactionsMsgHandler.class.getDeclaredField("advService");
f2.setAccessible(true);
f2.set(handler, advService);
Field f3 = TransactionsMsgHandler.class.getDeclaredField("chainBaseManager");
f3.setAccessible(true);
f3.set(handler, chainBaseManager);

PeerConnection peer = Mockito.mock(PeerConnection.class);

BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
.setAmount(10)
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
.build();
long now = System.currentTimeMillis();
Protocol.Transaction trx = Protocol.Transaction.newBuilder().setRawData(
Protocol.Transaction.raw.newBuilder()
.setTimestamp(now)
.setExpiration(now + 60_000)
.setRefBlockNum(1)
.addContract(Protocol.Transaction.Contract.newBuilder()
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
.setParameter(Any.pack(tc)).build()).build())
.build();
TransactionMessage trxMsg = new TransactionMessage(trx);

Method handleTx = TransactionsMsgHandler.class.getDeclaredMethod(
"handleTransaction", PeerConnection.class, TransactionMessage.class);
handleTx.setAccessible(true);

// happy path → push and broadcast
Mockito.when(chainBaseManager.getNextBlockSlotTime()).thenReturn(now);
handleTx.invoke(handler, peer, trxMsg);
Mockito.verify(advService).broadcast(trxMsg);

// P2pException BAD_TRX → disconnect
Mockito.doThrow(new P2pException(TypeEnum.BAD_TRX, "bad"))
.when(tronNetDelegate).pushTransaction(Mockito.any());
handleTx.invoke(handler, peer, trxMsg);
Mockito.verify(peer).setBadPeer(true);
Mockito.verify(peer).disconnect(Protocol.ReasonCode.BAD_TX);
} finally {
handler.close();
}
}

class TrxEvent {

@Getter
Expand Down
Loading