KAFKA-20678: Add timeout in replica manager log reader remote read.#22707
KAFKA-20678: Add timeout in replica manager log reader remote read.#22707smjn wants to merge 6 commits into
Conversation
apoorvmittal10
left a comment
There was a problem hiding this comment.
Thanks for the PR, one comment.
| future.completeExceptionally(new TimeoutException( | ||
| "Remote read for " + remoteStorageFetchInfo + " did not complete within " + timeoutMs + " ms.")); | ||
| } |
There was a problem hiding this comment.
Don't we also need not to cancel the pending future for remote calls?
There was a problem hiding this comment.
yes correct, we are doing it - the code is as follows. I will rename the remote future as remoteFuture:
TimeoutTask timeoutTask = new TimeoutTask(delay) {
// cancel remote task. If remote task is already completed - below will be a no-op.
() -> {remoteFuture.completeExceptionally(new TimoutException())};
}
// in case the remoteFuture completed before timeoutTask - cancel the timeoutTask. If the timeoutTask has
// already completed - below will be a no-op
remoteFuture.whenComplete((val, exp) -> timoutTask.cancel());
| }; | ||
| // Cancel the timer task once the read completes (either outcome) so it does not linger in the wheel. | ||
| future.whenComplete((info, exception) -> timeoutTask.cancel()); | ||
| replicaManager.addShareFetchTimerRequest(timeoutTask); |
There was a problem hiding this comment.
Also why share fetch timer API being used here? Also should we make it callers responsibility to handle the timeouts?
There was a problem hiding this comment.
Hi,
ReplicaManager already has a timer wheel for consumption by the QFK code (dedicated). This was the only method which exposes that timer so wanted to reuse for resource efficiency reasons.
Will add a comment to clarify.
I decided to add it here as current callers use the same logic to terminate the remote future with the same remoteFetchMaxWaitMs.
We can move to caller as well if their termination logic is divergent - but this would mean that we will lose possibility of partial results from the local read.
| future.completeExceptionally(new TimeoutException( | ||
| "Remote read for " + remoteStorageFetchInfo + " did not complete within " + timeoutMs + " ms.")); |
There was a problem hiding this comment.
Just completing it exceptionally might not be a good idea as the method can still return the locally fetched data while skipping the remote storage partitions.
There was a problem hiding this comment.
Hi, this future is ONLY for the remote result. readRemote is a package private method. The overall future containing combined results is in readAsync which will supply the appropriate partial local results and set error in the LogReadResult.
| } | ||
| if (remoteFetchDataInfo == null) { | ||
| // We want to return successful local read results so no skipping. | ||
| return withInfoAndError(logReadResult, localFetchDataInfo, Errors.UNKNOWN_SERVER_ERROR); |
There was a problem hiding this comment.
Question - When is this scenario possible?
There was a problem hiding this comment.
It is purely defensive - not reachable in current impl. The async read call on replicaManager finally results in call to ReplicaManager.call which always results in a completed object.
I will add a comment to clarify.
There was a problem hiding this comment.
lets just get rid of it then
| LinkedHashMap<TopicIdPartition, CompletableFuture<LogReadResult>> futures = new LinkedHashMap<>(); | ||
| for (TopicIdPartition topicIdPartition : partitionsToFetch) { | ||
| LogReadResult logReadResult = localReadResults.get(topicIdPartition); | ||
| if (logReadResult == null) { |
There was a problem hiding this comment.
Similar for this case, when can logReadResult be null. If it cannot be null, then we shouldn't handle it, I think
ReplicaManagerLogReader.readAsync fetches data from remote storage, if
enabled. The response is async. Current code does not bound this call to
remote storage. This PR uses the existing
remoteFetchMaxWaitMstoalleviate the situation by scheduling a timed killer task into the
replica manager's timer wheel.
A new integ test has been added to ShareConsumerDLQTest to verify e-2-e
tiering based DLQ.
Reviewers: Apoorv Mittal apoorvmittal10@gmail.com, Abhinav Dixit
adixit@confluent.io