[CELEBORN-2315] Add iterator fully-consumed validation after shuffle write#3672
[CELEBORN-2315] Add iterator fully-consumed validation after shuffle write#3672xumingming wants to merge 3 commits into
Conversation
5a50c71 to
dbd6473
Compare
|
@gauravkm @RexXiong @SteNicholas Could you also take a look at this one? |
|
@RexXiong @SteNicholas @gauravkm Gentle ping :) |
|
i’ll help take a look at this PR over the next couple days |
There was a problem hiding this comment.
Pull request overview
This PR adds a correctness guard to Celeborn’s Spark shuffle writers (Spark 2 and Spark 3 variants): after finishing the write path, it validates that the upstream records iterator was fully consumed and kills the task if it wasn’t, reducing the risk of silent data loss.
Changes:
- Add
SparkUtils.assertIteratorFullyConsumed(...)helper and invoke it at the end of shuffle-writer close paths. - Refactor Hash/Sort-based writers’ write flows to propagate an
iteratorHasNextsignal from the write loop to close/validation. - Extend
TaskInterruptedHelperto support an optional message inTaskKilledExceptionand add unit tests for the new assertion.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | Adds assertIteratorFullyConsumed helper (Spark 3). |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | Adds assertIteratorFullyConsumed helper (Spark 2). |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | Returns iterator-consumption status from doWrite and validates on close. |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | Same iterator-consumption validation wiring for hash-based writer. |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | Same iterator-consumption validation wiring (Spark 2). |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | Same iterator-consumption validation wiring (Spark 2). |
| client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/TaskInterruptedHelper.java | Adds overload to include a message in TaskKilledException. |
| client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java | Adds unit tests for the new iterator-consumed assertion and mocks kill reason. |
| client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java | Adds unit tests for the new iterator-consumed assertion and mocks kill reason. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
dbd6473 to
bba2479
Compare
|
@SteNicholas I have made all the necessary changes, can you take a look at again? |
|
@xumingming, please take a look at the comments from claude code: |
|
@SteNicholas Answers to Claude's comments:
This is actually a good catch. I have moved the assertIteratorFullyConsumed call site, so when we throw the TaskKilledException, the cleanup has not been done yet -- be consistent as original design. BTW: Currently the needCleanupPusher is very tricky and fragile, I'd like a add another PR to optimize it.
An earlier review comment asked me to pass type parameter to the iterator. Now it is asking them it might not be a good idea. I'm ok with either.
Drive a real writer with a partially-consumed iterator — is harder to construct realistically than it sounds. In normal flow, doWrite() consumes the iterator to exhaustion (while (records.hasNext())). For iteratorHasNext to be true after doWrite(), you'd need to simulate a framework-level anomaly: interrupt mid-loop, iterator returning spurious elements, or concurrent modification. That test ends up testing the mock more than the code. |
6a9fc92 to
1d26dd3
Compare
1d92a40 to
cf8d472
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3672 +/- ##
==========================================
+ Coverage 66.91% 67.00% +0.09%
==========================================
Files 358 359 +1
Lines 21986 22226 +240
Branches 1946 1969 +23
==========================================
+ Hits 14710 14890 +180
- Misses 6262 6312 +50
- Partials 1014 1024 +10 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Thanks for the work on this PR. The iterator-fully-consumed check is a good defensive guard, but it's worth noting that this only verifies the read side — i.e., all records were consumed from the iterator. It does not guarantee that all consumed records were actually shuffled out to the remote shuffle service successfully. The write pipeline has multiple stages where records could be silently lost after being consumed: A stronger correctness guarantee would be to verify that the number of records successfully written (e.g., Something like: if (inputRecordCount != writeMetrics.recordsWritten()) {
// fail the task before mapperEnd
}Note that for Would you consider adding a record count cross-check in this PR or as a follow-up JIRA? Reviewed with Claude Code |
@RexXiong Once the data enters the "serialize to buffer", the correctness is handled by CRC check, if any data is lost, it will be discovered by the CRC check mechanism, right? Fully-consumed validation is a supplement to the existing CRC check, to make sure all data is consumed before enter the "CRC covered zone". |
…write Adds a post-write safety check to HashBasedShuffleWriter and SortBasedShuffleWriter: after the write loop completes, verify the input iterator was fully consumed. If records remain, kill the task with TaskKilledException. This guards against silent data loss.
…apperEnd Move assertIteratorFullyConsumed before mapperEnd in all four shuffle writer variants to ensure partial map outputs are never committed to the shuffle service. Update comments to accurately describe each variant's resource cleanup path (HashBased vs SortBased).
1d26dd3 to
318d4f3
Compare
|
Makes sense — the CRC check covers the post-serialization path, and this PR fills the gap before that. LGTM. Reviewed with Claude Code |
|
One more thought — I'm not sure
An Reviewed with Claude Code |
… CelebornIOException Replace TaskKilledException with CelebornIOException in assertIteratorFullyConsumed. CelebornIOException extends IOException and fits the existing throws IOException contract of all write() methods cleanly, without needing an unchecked exception workaround. Also revert the throwTaskKillException(String message) overload added to TaskInterruptedHelper in CELEBORN-2315, which is now dead code.
|
Thanks. Merged to main(v0.7.0). |
What changes were proposed in this pull request?
Adds a post-write safety check to HashBasedShuffleWriter and SortBasedShuffleWriter: after the write loop completes, verify the input iterator was fully consumed. If records remain, kill the task with TaskKilledException. This guards against silent data loss.
Why are the changes needed?
It could give another layer of correctness guarantee.
Does this PR resolve a correctness bug?
Enhance correctness guarantee.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT.