KAFKA-20246: Add clusterId and nodeId to ApiVersionsRequest (3/N)#22512
KAFKA-20246: Add clusterId and nodeId to ApiVersionsRequest (3/N)#22512AndrewJSchofield wants to merge 10 commits into
Conversation
rajinisivaram
left a comment
There was a problem hiding this comment.
@AndrewJSchofield Thanks for the PR, left a couple of questions.
| // To avoid those, only check if the node ID is less than half of Integer.MAX_VALUE. | ||
| if (clusterId != null && nodeId >= 0 && nodeId < Integer.MAX_VALUE / 2) { | ||
| // We can get the real node ID by subtracting from Integer.MAX_VALUE. | ||
| if (nodeId > Integer.MAX_VALUE / 2) { |
There was a problem hiding this comment.
There is an assumption here that node ids are less than Integer.MAX_VALUE / 2. But that is not a documented limitation for broker ids, right? Isn't it possible that some deployments use larger broker ids, in which case, the larger number is the actual broker id and the smaller one is the coordinator id?
There was a problem hiding this comment.
OK, but in that case, wouldn't the entire principle of using large node IDs for the coordinator connections be entirely broken then? We could end up a broker using a very high node ID, and another one which coincidentally has the same node ID when it is being connected as a coordinator. @dajac What do you think? What's a safe path here?
There was a problem hiding this comment.
I may be wrong, but I thought for the coordinator, we weren't relying on node ids being low, we were just relying on the node ids being close together, to avoid conflicts between broker node ids and the coordinator ids. Let's see what @dajac says.
There was a problem hiding this comment.
Yes, let's. I can think of a way of improving this area if simple arithmetic is not enough.
There was a problem hiding this comment.
The coordinator id is just the broker id of the broker hosting the coordinator. Hence, it should usually be low, say < 100000. However, I agree with @rajinisivaram that this is not documented anywhere. Honestly, I don't like the MAX - id hack at all. I wonder if we should rather fix the underlying issue which is that we limit the number of connections by node id. Could you have a way to tell the network client that we want a connection which is not limited by the maximum number of connections allowed? I haven't check how feasible it it though.
My understanding is also that the connection to the group coordinator won't have the nodeId set which is also weird from a design perspective.
There was a problem hiding this comment.
Could we override idString in Node to be coordinator-{id} to force the creating of a new connection without hacking the id?
There was a problem hiding this comment.
Yes, overriding idString for coordinator connections does work.
| // Version 5 introduces ClusterId and NodeId checking and REBOOTSTRAP_REQUIRED error (KIP-1242). | ||
| "validVersions": "0-5", | ||
| "flexibleVersions": "3+", | ||
| "latestVersionUnstable": true, |
There was a problem hiding this comment.
This change is not related to the KIP?
There was a problem hiding this comment.
While the implementation was incomplete, I marked the latest version as unstable. This change removes the unstable indication so that new RPC version is now considered stable. That's my intent anyway.
# Conflicts: # docs/getting-started/upgrade.md
|
The failure of |
| this(id, host, port, rack, isFenced, false); | ||
| } | ||
|
|
||
| public Node(int id, String host, int port, String rack, boolean isFenced, boolean isCoordinator) { |
There was a problem hiding this comment.
I wonder if we should generalize it a bit more so we could also use it to have dedicated connection to group coordinator and transaction coordinator in the producer.
There was a problem hiding this comment.
Hmm. That's a thought. I will take a look. This is one of those PRs where I'm gradually learning about areas I've not really dug into before.
There was a problem hiding this comment.
I've taken a look and this is a bigger piece of work. It's also not necessary in this PR because the way TransactionManager handles the Nodes is much simpler. I propose not to do this enhancement as part of this PR.
| if (id < 0) { | ||
| throw new IllegalArgumentException("Node id for coordinator node cannot be negative"); | ||
| } | ||
| this.idString = "+" + id; |
There was a problem hiding this comment.
why + vs using e.g. coordinator-?
There was a problem hiding this comment.
Because I want integer parsing to work. All we need is a difference. There's no need to make this more difficult I feel. I did start with coordinator-[%d} but then I needed a parsing method in Node and I decided the complexity wasn't really worth it.
| @@ -1138,10 +1138,7 @@ private void handleInitiateApiVersionRequests(long now) { | |||
| if (metadataRecoveryStrategy != MetadataRecoveryStrategy.NONE && metadataClusterCheckEnable) { | |||
| String clusterId = this.metadataUpdater.clusterId(); | |||
| int nodeId = Integer.parseInt(node); | |||
There was a problem hiding this comment.
The Node is generally represented by its ID string in NetworkClient, for example disconnect(String nodeId) and Map<String, ...> nodesNeedingApiVersionsFetch. For the protocol, I want to obtain the numeric ID and I have the ID string in hand, not the actual Node reference.
There was a problem hiding this comment.
My bad. I thought that node was a Node :).
Part of the implementation of
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1242%3A+Detection+and+handling+of+misrouted+connections.
Support coordinator connections and mark ApiVersions v5 API as stable.
Reviewers: TaiJuWu tjwu1217@gmail.com, David Jacot
david.jacot@gmail.com