Skip to content

KAFKA-20678: Add timeout in replica manager log reader remote read.#22707

Open
smjn wants to merge 6 commits into
apache:trunkfrom
smjn:KAFKA-20678-2
Open

KAFKA-20678: Add timeout in replica manager log reader remote read.#22707
smjn wants to merge 6 commits into
apache:trunkfrom
smjn:KAFKA-20678-2

Conversation

@smjn

@smjn smjn commented Jun 30, 2026

Copy link
Copy Markdown
Collaborator

ReplicaManagerLogReader.readAsync fetches data from remote storage, if
enabled. The response is async. Current code does not bound this call to
remote storage. This PR uses the existing remoteFetchMaxWaitMs to
alleviate the situation by scheduling a timed killer task into the
replica manager's timer wheel.

A new integ test has been added to ShareConsumerDLQTest to verify e-2-e
tiering based DLQ.

Reviewers: Apoorv Mittal apoorvmittal10@gmail.com, Abhinav Dixit
adixit@confluent.io

@github-actions github-actions Bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Jun 30, 2026
@smjn smjn requested a review from apoorvmittal10 June 30, 2026 08:33
@github-actions github-actions Bot added build Gradle build or GitHub Actions clients labels Jun 30, 2026

@apoorvmittal10 apoorvmittal10 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, one comment.

Comment on lines +235 to +237
future.completeExceptionally(new TimeoutException(
"Remote read for " + remoteStorageFetchInfo + " did not complete within " + timeoutMs + " ms."));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we also need not to cancel the pending future for remote calls?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes correct, we are doing it - the code is as follows. I will rename the remote future as remoteFuture:

TimeoutTask timeoutTask = new TimeoutTask(delay) {
// cancel remote task. If remote task is already completed - below will be a no-op.
() -> {remoteFuture.completeExceptionally(new TimoutException())};
}

// in case the remoteFuture completed before timeoutTask - cancel the timeoutTask. If the timeoutTask has
// already completed - below will be a no-op
remoteFuture.whenComplete((val, exp) -> timoutTask.cancel());

};
// Cancel the timer task once the read completes (either outcome) so it does not linger in the wheel.
future.whenComplete((info, exception) -> timeoutTask.cancel());
replicaManager.addShareFetchTimerRequest(timeoutTask);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why share fetch timer API being used here? Also should we make it callers responsibility to handle the timeouts?

@smjn smjn Jul 2, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
ReplicaManager already has a timer wheel for consumption by the QFK code (dedicated). This was the only method which exposes that timer so wanted to reuse for resource efficiency reasons.
Will add a comment to clarify.
I decided to add it here as current callers use the same logic to terminate the remote future with the same remoteFetchMaxWaitMs.
We can move to caller as well if their termination logic is divergent - but this would mean that we will lose possibility of partial results from the local read.

Comment on lines +235 to +236
future.completeExceptionally(new TimeoutException(
"Remote read for " + remoteStorageFetchInfo + " did not complete within " + timeoutMs + " ms."));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just completing it exceptionally might not be a good idea as the method can still return the locally fetched data while skipping the remote storage partitions.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, this future is ONLY for the remote result. readRemote is a package private method. The overall future containing combined results is in readAsync which will supply the appropriate partial local results and set error in the LogReadResult.

@github-actions github-actions Bot removed the triage PRs from the community label Jul 2, 2026
}
if (remoteFetchDataInfo == null) {
// We want to return successful local read results so no skipping.
return withInfoAndError(logReadResult, localFetchDataInfo, Errors.UNKNOWN_SERVER_ERROR);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question - When is this scenario possible?

@smjn smjn Jul 2, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is purely defensive - not reachable in current impl. The async read call on replicaManager finally results in call to ReplicaManager.call which always results in a completed object.
I will add a comment to clarify.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets just get rid of it then

LinkedHashMap<TopicIdPartition, CompletableFuture<LogReadResult>> futures = new LinkedHashMap<>();
for (TopicIdPartition topicIdPartition : partitionsToFetch) {
LogReadResult logReadResult = localReadResults.get(topicIdPartition);
if (logReadResult == null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar for this case, when can logReadResult be null. If it cannot be null, then we shouldn't handle it, I think

@smjn smjn requested a review from adixitconfluent July 2, 2026 11:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Gradle build or GitHub Actions clients core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants