diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java index 12cdc18e5d36..4a131954cb06 100644 --- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java +++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java @@ -88,34 +88,35 @@ private Future fetchLogEntriesAndWaitInternal(InetAddressAndPor return res; } - Promise fetchRes = new AsyncPromise<>(); + Promise fetchFromRemote = new AsyncPromise<>(); + Future appendToLog = fetchFromRemote.map(logState -> { + log.append(logState); + ClusterMetadata fetched = log.waitForHighestConsecutive(); + if (fetched.epoch.isEqualOrAfter(awaitAtleast)) + { + TCMMetrics.instance.peerLogEntriesFetched(before, logState.latestEpoch()); + return fetched; + } + else + { + throw new IllegalStateException(String.format("Queried for epoch %s, but could not catch up. Current epoch: %s", awaitAtleast, fetched.epoch)); + } + }); + logger.info("Fetching log from {}, at least {}", remote, awaitAtleast); try (Timer.Context ctx = TCMMetrics.instance.fetchPeerLogLatency.time()) { RemoteProcessor.sendWithRetries(Verb.TCM_FETCH_PEER_LOG_REQ, new FetchPeerLog(before), - fetchRes, + fetchFromRemote, new RemoteProcessor.CandidateIterator(Collections.singletonList(remote), false), Retry.untilElapsed(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), TCMMetrics.instance.fetchLogRetries)); - - return fetchRes.map((logState) -> { - log.append(logState); - ClusterMetadata fetched = log.waitForHighestConsecutive(); - if (fetched.epoch.isEqualOrAfter(awaitAtleast)) - { - TCMMetrics.instance.peerLogEntriesFetched(before, logState.latestEpoch()); - return fetched; - } - else - { - throw new IllegalStateException(String.format("Queried for epoch %s, but could not catch up. Current epoch: %s", awaitAtleast, fetched.epoch)); - } - }); - + return appendToLog; } catch (Throwable t) { - fetchRes.cancel(true); + fetchFromRemote.cancel(true); + appendToLog.cancel(true); JVMStabilityInspector.inspectThrowable(t); logger.warn("Unable to fetch log entries from " + remote, t);