Skip to content

KAFKA-20246: Add clusterId and nodeId to ApiVersionsRequest (3/N)#22512

Open
AndrewJSchofield wants to merge 10 commits into
apache:trunkfrom
AndrewJSchofield:KAFKA-20246-4
Open

KAFKA-20246: Add clusterId and nodeId to ApiVersionsRequest (3/N)#22512
AndrewJSchofield wants to merge 10 commits into
apache:trunkfrom
AndrewJSchofield:KAFKA-20246-4

Conversation

@AndrewJSchofield

@AndrewJSchofield AndrewJSchofield commented Jun 8, 2026

Copy link
Copy Markdown
Member

Part of the implementation of
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1242%3A+Detection+and+handling+of+misrouted+connections.

Support coordinator connections and mark ApiVersions v5 API as stable.

Reviewers: TaiJuWu tjwu1217@gmail.com, David Jacot
david.jacot@gmail.com

@github-actions github-actions Bot added core Kafka Broker clients small Small PRs labels Jun 8, 2026

@rajinisivaram rajinisivaram left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield Thanks for the PR, left a couple of questions.

// To avoid those, only check if the node ID is less than half of Integer.MAX_VALUE.
if (clusterId != null && nodeId >= 0 && nodeId < Integer.MAX_VALUE / 2) {
// We can get the real node ID by subtracting from Integer.MAX_VALUE.
if (nodeId > Integer.MAX_VALUE / 2) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an assumption here that node ids are less than Integer.MAX_VALUE / 2. But that is not a documented limitation for broker ids, right? Isn't it possible that some deployments use larger broker ids, in which case, the larger number is the actual broker id and the smaller one is the coordinator id?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, but in that case, wouldn't the entire principle of using large node IDs for the coordinator connections be entirely broken then? We could end up a broker using a very high node ID, and another one which coincidentally has the same node ID when it is being connected as a coordinator. @dajac What do you think? What's a safe path here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be wrong, but I thought for the coordinator, we weren't relying on node ids being low, we were just relying on the node ids being close together, to avoid conflicts between broker node ids and the coordinator ids. Let's see what @dajac says.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's. I can think of a way of improving this area if simple arithmetic is not enough.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The coordinator id is just the broker id of the broker hosting the coordinator. Hence, it should usually be low, say < 100000. However, I agree with @rajinisivaram that this is not documented anywhere. Honestly, I don't like the MAX - id hack at all. I wonder if we should rather fix the underlying issue which is that we limit the number of connections by node id. Could you have a way to tell the network client that we want a connection which is not limited by the maximum number of connections allowed? I haven't check how feasible it it though.

My understanding is also that the connection to the group coordinator won't have the nodeId set which is also weird from a design perspective.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we override idString in Node to be coordinator-{id} to force the creating of a new connection without hacking the id?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, overriding idString for coordinator connections does work.

// Version 5 introduces ClusterId and NodeId checking and REBOOTSTRAP_REQUIRED error (KIP-1242).
"validVersions": "0-5",
"flexibleVersions": "3+",
"latestVersionUnstable": true,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not related to the KIP?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the implementation was incomplete, I marked the latest version as unstable. This change removes the unstable indication so that new RPC version is now considered stable. That's my intent anyway.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@github-actions github-actions Bot added consumer and removed small Small PRs labels Jun 19, 2026
@github-actions github-actions Bot added the tools label Jun 22, 2026
@AndrewJSchofield

Copy link
Copy Markdown
Member Author

The failure of KafkaAdminClientTest.testUnreachableBootstrapNode is not caused by this PR, because there's a latent bug in that test. I'll put in a fix for that too.

this(id, host, port, rack, isFenced, false);
}

public Node(int id, String host, int port, String rack, boolean isFenced, boolean isCoordinator) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should generalize it a bit more so we could also use it to have dedicated connection to group coordinator and transaction coordinator in the producer.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. That's a thought. I will take a look. This is one of those PRs where I'm gradually learning about areas I've not really dug into before.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken a look and this is a bigger piece of work. It's also not necessary in this PR because the way TransactionManager handles the Nodes is much simpler. I propose not to do this enhancement as part of this PR.

if (id < 0) {
throw new IllegalArgumentException("Node id for coordinator node cannot be negative");
}
this.idString = "+" + id;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why + vs using e.g. coordinator-?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I want integer parsing to work. All we need is a difference. There's no need to make this more difficult I feel. I did start with coordinator-[%d} but then I needed a parsing method in Node and I decided the complexity wasn't really worth it.

@@ -1138,10 +1138,7 @@ private void handleInitiateApiVersionRequests(long now) {
if (metadataRecoveryStrategy != MetadataRecoveryStrategy.NONE && metadataClusterCheckEnable) {
String clusterId = this.metadataUpdater.clusterId();
int nodeId = Integer.parseInt(node);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just use node.id here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Node is generally represented by its ID string in NetworkClient, for example disconnect(String nodeId) and Map<String, ...> nodesNeedingApiVersionsFetch. For the protocol, I want to obtain the numeric ID and I have the ID string in hand, not the actual Node reference.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. I thought that node was a Node :).

@dajac dajac left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants