[CELEBORN-2329][CIP22] Encryption at Rest Spark Impl#3689
Conversation
|
+CC @mridulm @rmcyang @FMX @SteNicholas PTAL, thanks. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3689 +/- ##
==========================================
+ Coverage 66.91% 66.98% +0.08%
==========================================
Files 358 359 +1
Lines 21986 22218 +232
Branches 1946 1968 +22
==========================================
+ Hits 14710 14881 +171
- Misses 6262 6315 +53
- Partials 1014 1022 +8 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR adds Spark-side “encryption at rest” support by introducing a pluggable crypto interface in the Celeborn client, wiring encryption into push paths and decryption into fetch/read paths, and providing a Spark-backed implementation that uses Spark’s IO encryption settings/key.
Changes:
- Introduces
CryptoHandlerand threads an optional handler throughShuffleClient/ShuffleClientImplinto push (encrypt) and read (decrypt) code paths. - Adds Spark implementation (
SparkCryptoHandler) + plumbing (SparkCommonUtils.getCryptoHandler, Spark shuffle readers/managers updated to pass the handler). - Updates tests and shaded artifacts/dependencies to include the needed crypto runtime.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| client/src/test/java/org/apache/celeborn/client/read/CelebornInputStreamPeerFailoverTest.java | Updates test calls to new CelebornInputStream.create signature with optional crypto handler. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java | Encrypts outgoing shuffle payloads when crypto is enabled; passes handler into partition reads. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClient.java | Adds ShuffleClient.get(...) overload accepting an optional crypto handler; exposes setup hook. |
| client/src/main/java/org/apache/celeborn/client/security/CryptoHandler.java | New interface for encrypt/decrypt on byte arrays. |
| client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java | Decrypts incoming batches before optional decompression and consumption. |
| client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java | Adds no-op crypto handler setup for dummy client. |
| client/pom.xml | Adds commons-crypto dependency to client module. |
| client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReaderSuite.scala | Updates static mock to match new ShuffleClient.get signature. |
| client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | Extends constructors/plumbing to carry optional crypto handler to ShuffleClient. |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java | Updates reflective columnar shuffle reader construction to pass optional crypto handler. |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java | Passes Spark-derived crypto handler into writers/readers. |
| client-spark/spark-3-shaded/pom.xml | Ensures commons-crypto is included in shaded Spark 3 artifact. |
| client-spark/spark-3-columnar-shuffle/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReaderSuite.scala | Updates tests to pass Optional.empty() crypto handler. |
| client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala | Threads optional crypto handler through columnar reader. |
| client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | Threads optional crypto handler through Spark 2 reader. |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java | Passes Spark-derived crypto handler into Spark 2 writers/readers. |
| client-spark/spark-2-shaded/pom.xml | Ensures commons-crypto is included in shaded Spark 2 artifact. |
| client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCryptoHandlerSuiteJ.java | Adds unit tests for Spark crypto handler behavior. |
| client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCryptoHandler.java | Implements CryptoHandler using Spark CryptoStreamUtils. |
| client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java | Derives optional crypto handler from Spark config/env IO encryption key. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
PR Review: [CELEBORN-2329][CIP22] Encryption at Rest Spark ImplOverall: Good architecture, some performance and safety concerns. The encrypt-at-client / decrypt-at-client approach is clean — workers store ciphertext, no key distribution needed to workers. Compress-then-encrypt ordering is correct. Performance Concerns1. Heavy allocation on hot path — every batch allocates multiple objects
For a typical shuffle with thousands of batches per second per executor, this generates significant GC pressure. Consider:
Moderate Issues2. Non-Spark engines (Flink, MR) using the Celeborn client now pull in 3. SparkCommonUtils.getCryptoHandler(conf) // called in getWriter
SparkCommonUtils.getCryptoHandler(conf) // called in getReader (multiple times for columnar path)Each call accesses Minor / Nits4. No AES-GCM / integrity protection Spark's 5. Test coverage is good — roundtrip, wrong key, large data, empty data, offset. Could add a test for corrupted ciphertext (flip a byte in encrypted output, verify decrypt fails or produces wrong output). 6. Debug logging on every batch logger.debug("Encrypted shuffle data for shuffle {} ...", shuffleId, ...);
logger.debug("Decrypted shuffle data for shuffle {} ...", shuffleId, ...);Even with debug disabled, the varargs boxing and string formatting preparation happens. On a hot path with millions of batches, this adds up. Consider guarding with Reviewed with Claude Code |
|
I will address the above comments in 2 weeks, I will be on vacation. |
1d92a40 to
cf8d472
Compare
Review — Encryption at Rest (Spark)Nice work. The wiring is clean and the integration points are well chosen. I traced the full write→store→read path and the design is sound. A few notes below, mostly non-blocking suggestions and a couple of questions. What works well
Suggestions / questions
Overall this looks solid and well-scoped. The main thing I'd push on before merge is item (1) — an integration test for the round-trip wiring. |
- Add upper-bound check (decryptedLength > length - 4) in SparkCryptoHandler.decrypt() to guard against OOM from corrupted or wrong-key input - Fix incBytesRead/incDuplicateBytesRead metric undercount in CelebornInputStream by tracking encryptedSize separately from the decrypted size - Fix testEncryptWithOffset to actually exercise a non-zero offset (offset=10) - Defensively handle null in ShuffleClientImpl.setupCryptoHandler to prevent NPE - Add fallback 9-arg constructor in SparkUtils.ColumnarShuffleReaderConstructorHolder for backward compatibility with older columnar-shuffle modules - Move commons-crypto dependency from client/pom.xml to client-spark/common/pom.xml so non-Spark engines (Flink, MR) no longer pull it in unnecessarily Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…uffleReader spark-3.5-columnar-shuffle and spark-4-columnar-shuffle test suites call createColumnarShuffleReader without the Optional<CryptoHandler> parameter, causing a compile error. Add a 9-arg overload that defaults to Optional.empty() so existing callers don't need to be updated. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Hi @mridulm @RexXiong @SteNicholas I addressed the above comments. Seems like some of the tests are failing due to transient errors, can you please help to re-run them? |
- Cache getCryptoHandler() result in SparkShuffleManager (Spark 2 + 3) to avoid re-reading and re-parsing the config on every shuffle read/write call; uses a volatile field for safe lazy initialization - Add CelebornInputStreamCryptoRoundTripSuiteJ: integration-style test that exercises the full encrypt-on-write / decrypt-on-read path in CelebornInputStream, including compress+encrypt ordering, integrity check compatibility, and large payloads Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
SteNicholas
left a comment
There was a problem hiding this comment.
Encryption-at-Rest (EAR) Spark impl — review
Reviewed at the current head (9321d70). The core compress→encrypt / decrypt→decompress path is symmetric and sound, the integrity CRC correctly round-trips over plaintext on both write and read ends, the IV is fresh per batch, and the empty-payload case works (Spark writes the IV eagerly). The two latest commits also resolve earlier concerns: getCryptoHandler is now cached per SparkShuffleManager, and CelebornInputStreamCryptoRoundTripSuiteJ adds real read-path round-trip coverage (incl. compress+encrypt and integrity-check).
One blocking issue and several smaller items inline.
Blocking: the spark-3.5 / spark-4 columnar readers were not updated for the new cryptoHandler parameter, so on those profiles encryption silently isn't applied on read (inline at SparkUtils).
Two cross-cutting notes that don't map to a single line:
- Encryption state has two sources of truth — the write side reads
ShuffleClientImpl.cryptoHandler, the read side reads the per-streamOptionalthreaded intoCelebornInputStreamImpl. Fine today, easy to desync in a future refactor. - The new round-trip test uses a mock
XorCryptoHandleragainstCelebornInputStreamdirectly; it does not exercise the columnar reader construction path, so it won't catch the blocking issue below.
- Add cryptoHandler param to spark-3.5 and spark-4 columnar readers so encryption is no longer silently dropped on read for those profiles. Remove the now-unnecessary 9-arg backward-compat overload of createColumnarShuffleReader and fallback .impl in SparkUtils. - Move dedup/stale-attempt checks before decryption in CelebornInputStream.fillBuffer() to avoid paying the AES cost for batches that will be discarded. Use skipBytes() for skipped batches. - Fix encryptedBuf initial sizing to include headerLen headroom (same as compressedBuf/rawDataBuf) to prevent per-batch reallocation when compression is enabled. - Null out encryptedBuf in close() for consistency with other buffers. - Mark cryptoHandler volatile in ShuffleClientImpl for safe publication. - Guard decrypt debug log with isDebugEnabled() check. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Snapshot volatile cryptoHandler to a local var in pushOrMergeData to avoid a TOCTOU race between isPresent() and get() on the volatile field - Fix inaccurate comment in CelebornInputStream.init() that described SparkCryptoHandler-specific layout instead of the generic contract - Fix ByteBuf double-retain in CelebornInputStreamCryptoRoundTripSuiteJ: duplicate() shares the buffer without bumping the ref count, so the stream's single release correctly frees it - Add length < 4 guard in XorCryptoHandler.decrypt before the unsafe Platform.getInt read Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
What changes were proposed in this pull request?
Adds EAR support for Spark side.
See more details in doc.
Why are the changes needed?
See more details in doc.
Does this PR resolve a correctness bug?
no
Does this PR introduce any user-facing change?
no
How was this patch tested?
unit tests and tested in production internally.