[CELEBORN-2315] Add iterator fully-consumed validation after shuffle write#3672
[CELEBORN-2315] Add iterator fully-consumed validation after shuffle write#3672xumingming wants to merge 2 commits into
Conversation
5a50c71 to
dbd6473
Compare
|
@gauravkm @RexXiong @SteNicholas Could you also take a look at this one? |
|
@RexXiong @SteNicholas @gauravkm Gentle ping :) |
|
i’ll help take a look at this PR over the next couple days |
There was a problem hiding this comment.
Pull request overview
This PR adds a correctness guard to Celeborn’s Spark shuffle writers (Spark 2 and Spark 3 variants): after finishing the write path, it validates that the upstream records iterator was fully consumed and kills the task if it wasn’t, reducing the risk of silent data loss.
Changes:
- Add
SparkUtils.assertIteratorFullyConsumed(...)helper and invoke it at the end of shuffle-writer close paths. - Refactor Hash/Sort-based writers’ write flows to propagate an
iteratorHasNextsignal from the write loop to close/validation. - Extend
TaskInterruptedHelperto support an optional message inTaskKilledExceptionand add unit tests for the new assertion.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | Adds assertIteratorFullyConsumed helper (Spark 3). |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | Adds assertIteratorFullyConsumed helper (Spark 2). |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | Returns iterator-consumption status from doWrite and validates on close. |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | Same iterator-consumption validation wiring for hash-based writer. |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | Same iterator-consumption validation wiring (Spark 2). |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | Same iterator-consumption validation wiring (Spark 2). |
| client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/TaskInterruptedHelper.java | Adds overload to include a message in TaskKilledException. |
| client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java | Adds unit tests for the new iterator-consumed assertion and mocks kill reason. |
| client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java | Adds unit tests for the new iterator-consumed assertion and mocks kill reason. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
dbd6473 to
bba2479
Compare
|
@SteNicholas I have made all the necessary changes, can you take a look at again? |
|
@xumingming, please take a look at the comments from claude code: |
|
@SteNicholas Answers to Claude's comments:
This is actually a good catch. I have moved the assertIteratorFullyConsumed call site, so when we throw the TaskKilledException, the cleanup has not been done yet -- be consistent as original design. BTW: Currently the needCleanupPusher is very tricky and fragile, I'd like a add another PR to optimize it.
An earlier review comment asked me to pass type parameter to the iterator. Now it is asking them it might not be a good idea. I'm ok with either.
Drive a real writer with a partially-consumed iterator — is harder to construct realistically than it sounds. In normal flow, doWrite() consumes the iterator to exhaustion (while (records.hasNext())). For iteratorHasNext to be true after doWrite(), you'd need to simulate a framework-level anomaly: interrupt mid-loop, iterator returning spurious elements, or concurrent modification. That test ends up testing the mock more than the code. |
…write Adds a post-write safety check to HashBasedShuffleWriter and SortBasedShuffleWriter: after the write loop completes, verify the input iterator was fully consumed. If records remain, kill the task with TaskKilledException. This guards against silent data loss.
…apperEnd Move assertIteratorFullyConsumed before mapperEnd in all four shuffle writer variants to ensure partial map outputs are never committed to the shuffle service. Update comments to accurately describe each variant's resource cleanup path (HashBased vs SortBased).
6a9fc92 to
1d26dd3
Compare
1d92a40 to
cf8d472
Compare
What changes were proposed in this pull request?
Adds a post-write safety check to HashBasedShuffleWriter and SortBasedShuffleWriter: after the write loop completes, verify the input iterator was fully consumed. If records remain, kill the task with TaskKilledException. This guards against silent data loss.
Why are the changes needed?
It could give another layer of correctness guarantee.
Does this PR resolve a correctness bug?
Enhance correctness guarantee.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT