Skip to content

[CELEBORN-2315] Add iterator fully-consumed validation after shuffle write#3672

Closed
xumingming wants to merge 3 commits into
apache:mainfrom
xumingming:iterator-fully-consumed-check
Closed

[CELEBORN-2315] Add iterator fully-consumed validation after shuffle write#3672
xumingming wants to merge 3 commits into
apache:mainfrom
xumingming:iterator-fully-consumed-check

Conversation

@xumingming

@xumingming xumingming commented Apr 23, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Adds a post-write safety check to HashBasedShuffleWriter and SortBasedShuffleWriter: after the write loop completes, verify the input iterator was fully consumed. If records remain, kill the task with TaskKilledException. This guards against silent data loss.

Why are the changes needed?

It could give another layer of correctness guarantee.

Does this PR resolve a correctness bug?

Enhance correctness guarantee.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT.

@xumingming xumingming force-pushed the iterator-fully-consumed-check branch 2 times, most recently from 5a50c71 to dbd6473 Compare April 23, 2026 12:33
@xumingming

Copy link
Copy Markdown
Contributor Author

@gauravkm @RexXiong @SteNicholas Could you also take a look at this one?

@xumingming

Copy link
Copy Markdown
Contributor Author

@RexXiong @SteNicholas @gauravkm Gentle ping :)

@afterincomparableyum

Copy link
Copy Markdown
Contributor

i’ll help take a look at this PR over the next couple days

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a correctness guard to Celeborn’s Spark shuffle writers (Spark 2 and Spark 3 variants): after finishing the write path, it validates that the upstream records iterator was fully consumed and kills the task if it wasn’t, reducing the risk of silent data loss.

Changes:

  • Add SparkUtils.assertIteratorFullyConsumed(...) helper and invoke it at the end of shuffle-writer close paths.
  • Refactor Hash/Sort-based writers’ write flows to propagate an iteratorHasNext signal from the write loop to close/validation.
  • Extend TaskInterruptedHelper to support an optional message in TaskKilledException and add unit tests for the new assertion.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java Adds assertIteratorFullyConsumed helper (Spark 3).
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java Adds assertIteratorFullyConsumed helper (Spark 2).
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java Returns iterator-consumption status from doWrite and validates on close.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java Same iterator-consumption validation wiring for hash-based writer.
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java Same iterator-consumption validation wiring (Spark 2).
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java Same iterator-consumption validation wiring (Spark 2).
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/TaskInterruptedHelper.java Adds overload to include a message in TaskKilledException.
client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java Adds unit tests for the new iterator-consumed assertion and mocks kill reason.
client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java Adds unit tests for the new iterator-consumed assertion and mocks kill reason.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@xumingming xumingming force-pushed the iterator-fully-consumed-check branch from dbd6473 to bba2479 Compare May 13, 2026 09:06
@xumingming

Copy link
Copy Markdown
Contributor Author

@SteNicholas I have made all the necessary changes, can you take a look at again?

@SteNicholas

Copy link
Copy Markdown
Member

@xumingming, please take a look at the comments from claude code:

Code Review — [CELEBORN-2315] Add iterator fully-consumed validation after shuffle write 
  (apache/celeborn#3672)

  Overview

  Adds a post-write safety check to HashBasedShuffleWriter and SortBasedShuffleWriter (both spark-2
  and spark-3): doWrite() now returns whether the input iterator still has records, and
  close(iteratorHasNext) calls SparkUtils.assertIteratorFullyConsumed(...) which kills the task with
   TaskKilledException if records remain. TaskInterruptedHelper.throwTaskKillException is extended
  to carry an optional message merged with Spark's kill reason. +171/-46 across 9 files.

  Correctness — solid, with points to verify

  - Check placement is well-reasoned and correct. In every writer the assertion is placed after the
  pusher is drained / buffers returned and before shuffleClient.mapperEnd(...). This is the key
  invariant — a partial map output is never committed to the shuffle service — and it's right.
  - mapSideCombine handling is a genuine improvement. Capturing combinedIterator and checking
  combinedIterator.hasNext() (rather than the raw records) is the correct level: write0 consumes the
   combined iterator, and its exhaustion implies the input was drained by the aggregator.
  - Null/reason merging in throwTaskKillException(String) covers all four combinations cleanly.
  - ⚠️  finally { if (needCleanupPusher) cleanupPusher(); } still runs when the assertion throws.
  TaskKilledException isn't an InterruptedException, so it bypasses the catch; needCleanupPusher is
  still true, so cleanupPusher() executes after the pusher already terminated. Two things to
  confirm: (1) cleanupPusher() is safe/idempotent post-termination, and (2) if it throws
  IOException, that exception will supersede the TaskKilledException (Java finally semantics),
  masking the real kill reason. Worth a comment or guarding.
  - ⚠️  Generics: scala.collection.Iterator<?> combinedIterator = 
  dep.aggregator().get().combineValuesByKey(...) then write0(combinedIterator). The original passed
  the expression inline (type inferred). Confirm write0 accepts the wildcard without an
  unchecked/raw warning regression.
  - Minor: records.hasNext() is assumed side-effect-free/idempotent. True for standard Spark
  iterators; fine to rely on but worth being aware of for exotic interruptible/completion iterators.

  Behavior change / risk

  - This converts a silent condition into a hard task kill. For a ShuffleMapTask the writer must
  consume the full iterator, so a non-empty iterator genuinely indicates data loss — semantically
  correct. But any false positive (a custom iterator whose hasNext() blocks/recomputes, or an
  unforeseen early-termination path) would now fail previously-"successful" tasks.
  - Suggestion: gate the assertion behind a CelebornConf flag (default on) so it can be disabled
  quickly if false positives surface in production. Low cost, much safer rollout for a correctness
  guard touching every shuffle write.

  Test coverage — too shallow for the risk

  - The only new tests call SparkUtils.assertIteratorFullyConsumed(false/true) directly. They do not
   cover the actual integration: that a real writer with a partially-consumed iterator triggers the
  kill, that the normal full-consumption path does not falsely trip, that the kill happens before
  mapperEnd, or the new throwTaskKillException(String) branches (spark reason + message).
  - Recommend at least: (1) an assertion in the existing check(...) writer suites that a normal full
   write does not throw (guards against false positives across fast-write / combine / plain paths);
  (2) a test driving a writer/doWrite with an iterator left non-empty and asserting
  TaskKilledException is raised before mapperEnd.
  
  Consistency / style

  - @VisibleForTesting and the explanatory return-contract comment are present only on spark-3 
  SortBasedShuffleWriter.doWrite; the new doWrite in spark-2 Sort and both Hash writers have
  neither. Make the four doWrite methods consistent (annotation + one-line "returns true if not
  fully consumed" doc).
  - The "why this placement" comment exists on the Hash writers but not on either
  SortBasedShuffleWriter before assertIteratorFullyConsumed, even though placement-before-mapperEnd
  matters equally there. Add the same rationale so a future refactor doesn't move it past mapperEnd.
  - spark-3 test adds dependency.mapSideCombine()/aggregator() mocks but spark-2 test does not —
  confirm spark-2 suite still exercises the new doWrite branch (or is relying on a real dependency).

  Verdict
  
  Sound, well-placed change with correct core logic and good handling of the combine path. No
  blocking bug found. Before merge I'd want: a config gate for safe rollout, deeper
  (integration-level) tests of the guard and the new message branches, the finally/cleanupPusher
  interaction confirmed, and the cross-writer @VisibleForTesting/comment consistency tidied.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@xumingming

Copy link
Copy Markdown
Contributor Author

@SteNicholas Answers to Claude's comments:

  • ⚠️ finally { if (needCleanupPusher) cleanupPusher(); } still runs when the assertion throws.
    TaskKilledException isn't an InterruptedException, so it bypasses the catch; needCleanupPusher is
    still true, so cleanupPusher() executes after the pusher already terminated. Two things to
    confirm: (1) cleanupPusher() is safe/idempotent post-termination, and (2) if it throws
    IOException, that exception will supersede the TaskKilledException (Java finally semantics),
    masking the real kill reason. Worth a comment or guarding.

This is actually a good catch. I have moved the assertIteratorFullyConsumed call site, so when we throw the TaskKilledException, the cleanup has not been done yet -- be consistent as original design.

BTW: Currently the needCleanupPusher is very tricky and fragile, I'd like a add another PR to optimize it.

  • ⚠️ Generics: scala.collection.Iterator<?> combinedIterator =
    dep.aggregator().get().combineValuesByKey(...) then write0(combinedIterator). The original passed
    the expression inline (type inferred). Confirm write0 accepts the wildcard without an
    unchecked/raw warning regression.

An earlier review comment asked me to pass type parameter to the iterator. Now it is asking them it might not be a good idea. I'm ok with either.

Test coverage — too shallow for the risk

Drive a real writer with a partially-consumed iterator — is harder to construct realistically than it sounds. In normal flow, doWrite() consumes the iterator to exhaustion (while (records.hasNext())). For iteratorHasNext to be true after doWrite(), you'd need to simulate a framework-level anomaly: interrupt mid-loop, iterator returning spurious elements, or concurrent modification. That test ends up testing the mock more than the code.

@xumingming xumingming force-pushed the iterator-fully-consumed-check branch from 6a9fc92 to 1d26dd3 Compare May 20, 2026 08:17
@SteNicholas SteNicholas force-pushed the main branch 2 times, most recently from 1d92a40 to cf8d472 Compare May 27, 2026 02:11
@SteNicholas SteNicholas requested a review from Copilot June 2, 2026 11:59

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated no new comments.

@codecov

codecov Bot commented Jun 2, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 67.00%. Comparing base (b4cb5a0) to head (1d26dd3).
⚠️ Report is 52 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3672      +/-   ##
==========================================
+ Coverage   66.91%   67.00%   +0.09%     
==========================================
  Files         358      359       +1     
  Lines       21986    22226     +240     
  Branches     1946     1969      +23     
==========================================
+ Hits        14710    14890     +180     
- Misses       6262     6312      +50     
- Partials     1014     1024      +10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@RexXiong

RexXiong commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Thanks for the work on this PR. The iterator-fully-consumed check is a good defensive guard, but it's worth noting that this only verifies the read side — i.e., all records were consumed from the iterator. It does not guarantee that all consumed records were actually shuffled out to the remote shuffle service successfully.

The write pipeline has multiple stages where records could be silently lost after being consumed:

iterator.next()  →  serialize to buffer  →  dataPusher async push  →  shuffle service worker
       ↑                                                                        
   PR checks here                                          records could be lost anywhere after

A stronger correctness guarantee would be to verify that the number of records successfully written (e.g., writeMetrics.recordsWritten()) matches the number of records consumed from the input iterator. This would catch silent drops within the client-side pipeline (buffer management bugs, serialization edge cases, etc.).

Something like:

if (inputRecordCount != writeMetrics.recordsWritten()) {
    // fail the task before mapperEnd
}

Note that for mapSideCombine, the comparison should be done against the combined iterator's output count rather than the raw input count.

Would you consider adding a record count cross-check in this PR or as a follow-up JIRA?

Reviewed with Claude Code

@xumingming

xumingming commented Jun 8, 2026

Copy link
Copy Markdown
Contributor Author
 iterator.next()  →  serialize to buffer  →  dataPusher async push  →  shuffle service worker
        ↑                                                                        
    PR checks here                                          records could be lost anywhere after

@RexXiong Once the data enters the "serialize to buffer", the correctness is handled by CRC check, if any data is lost, it will be discovered by the CRC check mechanism, right? Fully-consumed validation is a supplement to the existing CRC check, to make sure all data is consumed before enter the "CRC covered zone".

…write

Adds a post-write safety check to HashBasedShuffleWriter and SortBasedShuffleWriter:
after the write loop completes, verify the input iterator was fully consumed.
If records remain, kill the task with TaskKilledException. This guards against
silent data loss.
…apperEnd

Move assertIteratorFullyConsumed before mapperEnd in all four shuffle
writer variants to ensure partial map outputs are never committed to
the shuffle service. Update comments to accurately describe each
variant's resource cleanup path (HashBased vs SortBased).
@xumingming xumingming force-pushed the iterator-fully-consumed-check branch from 1d26dd3 to 318d4f3 Compare June 9, 2026 07:52
@RexXiong

Copy link
Copy Markdown
Contributor

Makes sense — the CRC check covers the post-serialization path, and this PR fills the gap before that. LGTM.

Reviewed with Claude Code

@RexXiong RexXiong left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@RexXiong

Copy link
Copy Markdown
Contributor

One more thought — I'm not sure TaskKilledException is the right exception type here.

TaskKilledException semantically means "this task was killed externally" (speculative execution cancellation, stage abort, etc.). But iterator-not-fully-consumed is a data correctness issue, not a kill event. Using TaskKilledException here has a few consequences:

  1. Spark scheduler may not retryTaskKilledException is treated as a legitimate kill, not a task failure. The scheduler may skip retry, silently swallowing a real data integrity problem.
  2. Misleading metrics/logs — ops sees "TaskKilled" and assumes speculative execution or stage cancellation, not a data bug.
  3. task.killed metric pollution — inflates kill counts with events that aren't actual kills.

An IOException or CelebornIOException would be more appropriate — it signals a genuine failure, triggers normal retry, and shows up correctly in monitoring.

Reviewed with Claude Code

… CelebornIOException

Replace TaskKilledException with CelebornIOException in assertIteratorFullyConsumed.
CelebornIOException extends IOException and fits the existing throws IOException contract
of all write() methods cleanly, without needing an unchecked exception workaround.

Also revert the throwTaskKillException(String message) overload added to
TaskInterruptedHelper in CELEBORN-2315, which is now dead code.
@SteNicholas

Copy link
Copy Markdown
Member

Thanks. Merged to main(v0.7.0).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants