Skip to content

[CELEBORN-2315] Add iterator fully-consumed validation after shuffle write#3672

Open
xumingming wants to merge 2 commits into
apache:mainfrom
xumingming:iterator-fully-consumed-check
Open

[CELEBORN-2315] Add iterator fully-consumed validation after shuffle write#3672
xumingming wants to merge 2 commits into
apache:mainfrom
xumingming:iterator-fully-consumed-check

Conversation

@xumingming
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Adds a post-write safety check to HashBasedShuffleWriter and SortBasedShuffleWriter: after the write loop completes, verify the input iterator was fully consumed. If records remain, kill the task with TaskKilledException. This guards against silent data loss.

Why are the changes needed?

It could give another layer of correctness guarantee.

Does this PR resolve a correctness bug?

Enhance correctness guarantee.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT

@xumingming xumingming force-pushed the iterator-fully-consumed-check branch 2 times, most recently from 5a50c71 to dbd6473 Compare April 23, 2026 12:33
@xumingming
Copy link
Copy Markdown
Contributor Author

@gauravkm @RexXiong @SteNicholas Could you also take a look at this one?

@xumingming
Copy link
Copy Markdown
Contributor Author

@RexXiong @SteNicholas @gauravkm Gentle ping :)

@afterincomparableyum
Copy link
Copy Markdown
Contributor

i’ll help take a look at this PR over the next couple days

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a correctness guard to Celeborn’s Spark shuffle writers (Spark 2 and Spark 3 variants): after finishing the write path, it validates that the upstream records iterator was fully consumed and kills the task if it wasn’t, reducing the risk of silent data loss.

Changes:

  • Add SparkUtils.assertIteratorFullyConsumed(...) helper and invoke it at the end of shuffle-writer close paths.
  • Refactor Hash/Sort-based writers’ write flows to propagate an iteratorHasNext signal from the write loop to close/validation.
  • Extend TaskInterruptedHelper to support an optional message in TaskKilledException and add unit tests for the new assertion.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java Adds assertIteratorFullyConsumed helper (Spark 3).
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java Adds assertIteratorFullyConsumed helper (Spark 2).
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java Returns iterator-consumption status from doWrite and validates on close.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java Same iterator-consumption validation wiring for hash-based writer.
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java Same iterator-consumption validation wiring (Spark 2).
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java Same iterator-consumption validation wiring (Spark 2).
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/TaskInterruptedHelper.java Adds overload to include a message in TaskKilledException.
client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java Adds unit tests for the new iterator-consumed assertion and mocks kill reason.
client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java Adds unit tests for the new iterator-consumed assertion and mocks kill reason.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@xumingming xumingming force-pushed the iterator-fully-consumed-check branch from dbd6473 to bba2479 Compare May 13, 2026 09:06
@xumingming
Copy link
Copy Markdown
Contributor Author

@SteNicholas I have made all the necessary changes, can you take a look at again?

@SteNicholas
Copy link
Copy Markdown
Member

@xumingming, please take a look at the comments from claude code:

Code Review — [CELEBORN-2315] Add iterator fully-consumed validation after shuffle write 
  (apache/celeborn#3672)

  Overview

  Adds a post-write safety check to HashBasedShuffleWriter and SortBasedShuffleWriter (both spark-2
  and spark-3): doWrite() now returns whether the input iterator still has records, and
  close(iteratorHasNext) calls SparkUtils.assertIteratorFullyConsumed(...) which kills the task with
   TaskKilledException if records remain. TaskInterruptedHelper.throwTaskKillException is extended
  to carry an optional message merged with Spark's kill reason. +171/-46 across 9 files.

  Correctness — solid, with points to verify

  - Check placement is well-reasoned and correct. In every writer the assertion is placed after the
  pusher is drained / buffers returned and before shuffleClient.mapperEnd(...). This is the key
  invariant — a partial map output is never committed to the shuffle service — and it's right.
  - mapSideCombine handling is a genuine improvement. Capturing combinedIterator and checking
  combinedIterator.hasNext() (rather than the raw records) is the correct level: write0 consumes the
   combined iterator, and its exhaustion implies the input was drained by the aggregator.
  - Null/reason merging in throwTaskKillException(String) covers all four combinations cleanly.
  - ⚠️  finally { if (needCleanupPusher) cleanupPusher(); } still runs when the assertion throws.
  TaskKilledException isn't an InterruptedException, so it bypasses the catch; needCleanupPusher is
  still true, so cleanupPusher() executes after the pusher already terminated. Two things to
  confirm: (1) cleanupPusher() is safe/idempotent post-termination, and (2) if it throws
  IOException, that exception will supersede the TaskKilledException (Java finally semantics),
  masking the real kill reason. Worth a comment or guarding.
  - ⚠️  Generics: scala.collection.Iterator<?> combinedIterator = 
  dep.aggregator().get().combineValuesByKey(...) then write0(combinedIterator). The original passed
  the expression inline (type inferred). Confirm write0 accepts the wildcard without an
  unchecked/raw warning regression.
  - Minor: records.hasNext() is assumed side-effect-free/idempotent. True for standard Spark
  iterators; fine to rely on but worth being aware of for exotic interruptible/completion iterators.

  Behavior change / risk

  - This converts a silent condition into a hard task kill. For a ShuffleMapTask the writer must
  consume the full iterator, so a non-empty iterator genuinely indicates data loss — semantically
  correct. But any false positive (a custom iterator whose hasNext() blocks/recomputes, or an
  unforeseen early-termination path) would now fail previously-"successful" tasks.
  - Suggestion: gate the assertion behind a CelebornConf flag (default on) so it can be disabled
  quickly if false positives surface in production. Low cost, much safer rollout for a correctness
  guard touching every shuffle write.

  Test coverage — too shallow for the risk

  - The only new tests call SparkUtils.assertIteratorFullyConsumed(false/true) directly. They do not
   cover the actual integration: that a real writer with a partially-consumed iterator triggers the
  kill, that the normal full-consumption path does not falsely trip, that the kill happens before
  mapperEnd, or the new throwTaskKillException(String) branches (spark reason + message).
  - Recommend at least: (1) an assertion in the existing check(...) writer suites that a normal full
   write does not throw (guards against false positives across fast-write / combine / plain paths);
  (2) a test driving a writer/doWrite with an iterator left non-empty and asserting
  TaskKilledException is raised before mapperEnd.
  
  Consistency / style

  - @VisibleForTesting and the explanatory return-contract comment are present only on spark-3 
  SortBasedShuffleWriter.doWrite; the new doWrite in spark-2 Sort and both Hash writers have
  neither. Make the four doWrite methods consistent (annotation + one-line "returns true if not
  fully consumed" doc).
  - The "why this placement" comment exists on the Hash writers but not on either
  SortBasedShuffleWriter before assertIteratorFullyConsumed, even though placement-before-mapperEnd
  matters equally there. Add the same rationale so a future refactor doesn't move it past mapperEnd.
  - spark-3 test adds dependency.mapSideCombine()/aggregator() mocks but spark-2 test does not —
  confirm spark-2 suite still exercises the new doWrite branch (or is relying on a real dependency).

  Verdict
  
  Sound, well-placed change with correct core logic and good handling of the combine path. No
  blocking bug found. Before merge I'd want: a config gate for safe rollout, deeper
  (integration-level) tests of the guard and the new message branches, the finally/cleanupPusher
  interaction confirmed, and the cross-writer @VisibleForTesting/comment consistency tidied.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@xumingming
Copy link
Copy Markdown
Contributor Author

@SteNicholas Answers to Claude's comments:

  • ⚠️ finally { if (needCleanupPusher) cleanupPusher(); } still runs when the assertion throws.
    TaskKilledException isn't an InterruptedException, so it bypasses the catch; needCleanupPusher is
    still true, so cleanupPusher() executes after the pusher already terminated. Two things to
    confirm: (1) cleanupPusher() is safe/idempotent post-termination, and (2) if it throws
    IOException, that exception will supersede the TaskKilledException (Java finally semantics),
    masking the real kill reason. Worth a comment or guarding.

This is actually a good catch. I have moved the assertIteratorFullyConsumed call site, so when we throw the TaskKilledException, the cleanup has not been done yet -- be consistent as original design.

BTW: Currently the needCleanupPusher is very tricky and fragile, I'd like a add another PR to optimize it.

  • ⚠️ Generics: scala.collection.Iterator<?> combinedIterator =
    dep.aggregator().get().combineValuesByKey(...) then write0(combinedIterator). The original passed
    the expression inline (type inferred). Confirm write0 accepts the wildcard without an
    unchecked/raw warning regression.

An earlier review comment asked me to pass type parameter to the iterator. Now it is asking them it might not be a good idea. I'm ok with either.

Test coverage — too shallow for the risk

Drive a real writer with a partially-consumed iterator — is harder to construct realistically than it sounds. In normal flow, doWrite() consumes the iterator to exhaustion (while (records.hasNext())). For iteratorHasNext to be true after doWrite(), you'd need to simulate a framework-level anomaly: interrupt mid-loop, iterator returning spurious elements, or concurrent modification. That test ends up testing the mock more than the code.

…write

Adds a post-write safety check to HashBasedShuffleWriter and SortBasedShuffleWriter:
after the write loop completes, verify the input iterator was fully consumed.
If records remain, kill the task with TaskKilledException. This guards against
silent data loss.
…apperEnd

Move assertIteratorFullyConsumed before mapperEnd in all four shuffle
writer variants to ensure partial map outputs are never committed to
the shuffle service. Update comments to accurately describe each
variant's resource cleanup path (HashBased vs SortBased).
@xumingming xumingming force-pushed the iterator-fully-consumed-check branch from 6a9fc92 to 1d26dd3 Compare May 20, 2026 08:17
@SteNicholas SteNicholas force-pushed the main branch 2 times, most recently from 1d92a40 to cf8d472 Compare May 27, 2026 02:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants