Conversation
Add spark4_version (4.0.2) to BeamModulePlugin alongside the existing
spark3_version. Update spark_runner.gradle to conditionally select the
correct Scala library (2.13 vs 2.12), Jackson module, Kafka test
dependency, and require Java 17 when building against Spark 4.
Register the new :runners:spark:4 module in settings.gradle.kts.
These changes are purely additive — all conditionals gate on
spark_version.startsWith("4") or spark_scala_version == '2.13', leaving
the Spark 3 build path untouched.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add the Gradle build file for the Spark 4 structured streaming runner. The module mirrors runners/spark/3/ — it inherits the shared RDD-base source from runners/spark/src/ via copySourceBase and adds its own Structured Streaming implementation in src/main/java. Key differences from the Spark 3 build: - Uses spark4_version (4.0.2) with Scala 2.13. - Excludes DStream-based streaming tests (Spark 4 supports only structured streaming batch). - Unconditionally adds --add-opens JVM flags required by Kryo on Java 17 (Spark 4's minimum). - Binds Spark driver to 127.0.0.1 for macOS compatibility. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add the Spark 4 structured streaming runner implementation and tests. Most files are adapted from the Spark 3 structured streaming runner with targeted changes for Spark 4 / Scala 2.13 API compatibility. Key Spark 4-specific changes (diff against runners/spark/3/src/): EncoderFactory — Replaced the direct ExpressionEncoder constructor (removed in Spark 4) with BeamAgnosticEncoder, a named class implementing both AgnosticExpressionPathEncoder (for expression delegation via toCatalyst/fromCatalyst) and AgnosticEncoders .StructEncoder (so Dataset.select(TypedColumn) creates an N-attribute plan, preventing FIELD_NUMBER_MISMATCH). The toCatalyst/fromCatalyst methods substitute the provided input expression via transformUp, enabling correct nesting inside composite encoders like Encoders.tuple(). EncoderHelpers — Added toExpressionEncoder() helper to handle Spark 4 built-in encoders that are AgnosticEncoder subclasses rather than ExpressionEncoder. GroupByKeyTranslatorBatch — Migrated from internal catalyst Expression API (CreateNamedStruct, Literal$) to public Column API (struct(), lit(), array()), as required by Spark 4. BoundedDatasetFactory — Use classic.Dataset$.MODULE$.ofRows() as Dataset moved to org.apache.spark.sql.classic in Spark 4. ScalaInterop — Replace WrappedArray.ofRef (removed in Scala 2.13) with JavaConverters.asScalaBuffer().toList() in seqOf(). GroupByKeyHelpers, CombinePerKeyTranslatorBatch — Replace TraversableOnce with IterableOnce (Scala 2.13 rename). SparkStructuredStreamingPipelineResult — Replace sparkproject.guava with Beam's vendored Guava. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add GitHub Actions workflows for the Spark 4 runner module: - beam_PreCommit_Java_Spark4_Versions: runs sparkVersionsTest on changes to runners/spark/**. Currently a no-op (the sparkVersions map is empty) but scaffolds future patch version coverage. - beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming: runs the structured streaming test suite on Java 17. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Remove endOfData() call in close method.
Add job-server and container build configurations for Spark 4, mirroring the existing Spark 3 job-server setup. The container uses eclipse-temurin:17 (Spark 4 requires Java 17). The shared spark_job_server.gradle gains a requireJavaVersion conditional for Spark 4 parent projects. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The hostname binding hack is no longer needed now that the local machine resolves its hostname to 127.0.0.1 via /etc/hosts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Called out in /ultrareview as a missing contributor checklist item. Adds a Highlight line and a New Features / Improvements entry under the 2.74.0 Unreleased section, referencing issue apache#36841.
Per /ultrareview feedback: the one-line comment didn't make clear why the cast is safe. Expand it to note that SparkSession.builder() always returns a classic.SparkSession at runtime, which is why the downcast avoids reflection.
Per /ultrareview feedback: the fallback branch silently swallowed the second ClassNotFoundException. In practice one of the two classes is always present (Scala 2.12 vs 2.13 stdlib), but a silent skip could mask a broken classpath. Emit a LOG.warn instead.
Per /ultrareview feedback: the five `"$spark_version" >= "3.5.0"` checks were lexicographic string comparisons. They happened to work for 3.5.0 and 4.0.2 only because '4' > '3' as chars — a future "3.10.0" release would compare less than "3.5.0" and silently drop the Spark 3.5+ dependencies and exclusions. Introduce an `isSparkAtLeast` closure that tokenizes on `.` and `-`, keeps numeric parts, and compares component-by-component. Replace all five call sites.
With spark_runner.gradle now layering per-major source overrides on top of the shared base, runners/spark/4/src/ no longer needs to duplicate 62 byte-identical structured-streaming files. Keep only the 11 files that actually differ for Spark 4 / Scala 2.13. Switch the build.gradle to spark_major = '4' (the new mechanism) and bump spark_versions to 3,4. Compiled output unchanged — the deleted files are reproduced identically inside build/source-overrides by the Copy task.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces support for Apache Spark 4 in the Beam Spark runner. By leveraging the existing shared base and per-version override plumbing, the implementation remains lightweight and avoids duplicating the Spark 3 source tree. The changes include necessary build system updates, compatibility adjustments for Scala 2.13, and the addition of required CI infrastructure to ensure stability for the new Spark 4 runner. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Ignored Files
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
scala.Serializable was removed in Scala 2.13. java.io.Serializable works identically on both Scala 2.12 and 2.13, so this can live in the shared base instead of needing a Spark-4-only override file.
…base Wrap Throwables.getRootCause(e).getMessage() in String.valueOf(...) to make the error logging robust to a null root-cause message. The behaviour change applies equally to Spark 3 and Spark 4, so the fix lives in the shared base and the Spark-4 override is dropped.
… PipelineResult Two changes that previously lived only in the Spark-4 override and are equally valid for Spark 3: 1. cancel() now actually cancels the executing future (pipelineExecution.cancel(true)) in addition to setting the state to CANCELLED. Without this, calling cancel() left the pipeline running silently — a real bug, not a Spark-4 specific concern. 2. Switch from Spark's shaded guava (org.sparkproject.guava) to the Beam-vendored guava that is already on the classpath. Spark 4 no longer exposes the sparkproject guava package; using the vendored one removes the version coupling for both runners.
Empty commit to re-run CI. The only failure on the prior head was UnboundedScheduledExecutorServiceTest.testThreadsAreAddedOnlyAsNeededWithContention, a known flake (apache#31590) — the test itself acknowledges contention-induced extra threads in its inline comment. Squash or drop on rebase before merge.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces an experimental Spark 4 structured streaming runner for Java, built against Spark 4.0.2 and Scala 2.13, requiring Java 17. The changes include new modules for the Spark 4 runner and job server, updates to shared Spark source for Scala 2.12/2.13 compatibility, and various dependency adjustments. Feedback includes a critical fix for handling multi-windowed data in EncoderHelpers, removing redundant or unchecked casts in BoundedDatasetFactory and GroupByKeyTranslatorBatch, and addressing fragile reflection and implementation details in EncoderFactory.
Iterables.getOnlyElement(windows) crashes with IllegalArgumentException when a WindowedValue is associated with more than one window (e.g. after a sliding window assignment). Compute the max maxTimestamp() across all associated windows instead, falling back to a clear error if the iterable is unexpectedly empty. Applied identically to the shared base and the Spark 4 override. Flagged by Gemini Code Assist on PR apache#38255.
source.split returns List<? extends BoundedSource<T>>, which already satisfies the subsequent stream usage. The cast was unchecked and would trip heap-pollution warnings. Applied identically to the shared base and the Spark 4 override. Flagged by Gemini Code Assist on PR apache#38255.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces an experimental Spark 4 runner for the Java SDK, supporting batch processing with Spark 4.0.2 and Scala 2.13 on Java 17. It adds new modules for the Spark 4 runner and job server, while updating shared Spark source code to maintain compatibility with both Spark 3 and Spark 4. Key changes include the implementation of Spark 4-specific translators and encoders, and the migration from JavaConversions to JavaConverters. A redundant unchecked cast was identified in GroupByKeyHelpers.java where v.getWindows() can be used directly.
|
End-to-end smoke test (Spark 4.0.2 / Scala 2.13 / Java 17): Built A couple of items worth flagging before merge:
|
I remember this happened for Spark 3 as well: #26985 Yeah we can follow up on documentation or exclude dependencies |
WindowedValue#getWindows() returns Collection<? extends BoundedWindow>, which is already an Iterable and can be passed straight to ScalaInterop.scalaIterator(...). The intermediate local variable and the unchecked cast to Collection<BoundedWindow> were redundant. Applied in both the shared base and the Spark 4 override. Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Documents the Spark 4 runner's requirements (Java 17, Scala 2.13, Spark 4.0.x, batch-only) and the slf4j-jdk14 ↔ jul-to-slf4j conflict that is the Spark 4 manifestation of apache#26985 (fixed for Spark 3 in apache#27001). The shared spark_runner.gradle already excludes slf4j-jdk14 for in-tree builds; this note tells downstream consumers to mirror the exclude when assembling their own runtime classpath against beam-runners-spark-4.
|
Two follow-ups pushed (
PTAL when you get a chance or have some spare time, @Abacn |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces an experimental Spark 4 runner for the Java SDK, supporting batch processing with Spark 4.0.2 and Scala 2.13 on Java 17. It adds the :runners:spark:4 module and job-server components while updating shared Spark source code to ensure compatibility across Scala versions. Feedback was provided to correct a terminology inconsistency in an error message within the EncoderHelpers class to use "encoder" instead of "coder".
|
Re-tested at |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
The PreCommit Java failure on the previous run was a single timeout in FlinkRequiresStableInputTest.testParDoRequiresStableInputPortable (:runners:flink:1.17:test) — known flake tracked in apache#21333. This PR does not touch any Flink code. Squash or drop on rebase before merge.
|
The PreCommit Java failure on This is the known Flink flake tracked in #21333, and the PR does not touch any Flink code. Pushed an empty commit ( |
|
Assigning reviewers: R: @damccorm for label build. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Three small cleanups on top of the primaryConstructor() helper to match the equivalent code already merged into the Spark 4 override (runners/spark/4/src/.../EncoderFactory.java) in PR apache#38255 commit 9c071c5: - Replace the stale one-line comment ("default constructor to reflectively create static invoke expressions") with the four-line explanation block from the override, so future readers of the shared base see the same rationale. - Collapse the INVOKE_CONSTRUCTOR declaration onto a single line for formatting consistency with the override. - Drop a stray double blank line before create() that would fail Spotless, and add a single blank line before primaryConstructor() for readability. No behavioral change.
Replace `(Constructor<X>) X.class.getConstructors()[0]` for StaticInvoke, Invoke, and NewInstance with a `primaryConstructor()` helper that picks the constructor with the most parameters. Class.getConstructors() returns constructors in JVM-defined order that is not guaranteed stable, so resolving the widest constructor explicitly makes the lookup robust to future Spark releases that add overloaded constructors. Today this is a no-op: StaticInvoke / Invoke / NewInstance only have one public constructor each in Spark 3.1.x through 3.5.x, so getConstructors()[0] and the widest constructor resolve to the same one. The change is purely defensive. Same fix has already landed in the Spark 4 override in PR apache#38255 (commit 9c071c5, flagged by Gemini Code Assist round 1). Porting it to the shared base keeps both code paths consistent.
Replace `(Constructor<X>) X.class.getConstructors()[0]` for StaticInvoke, Invoke, and NewInstance with a `primaryConstructor()` helper that picks the constructor with the most parameters. Class.getConstructors() returns constructors in JVM-defined order that is not guaranteed stable, so resolving the widest constructor explicitly makes the lookup robust to future Spark releases that add overloaded constructors. Today this is a no-op: StaticInvoke / Invoke / NewInstance only have one public constructor each in Spark 3.1.x through 3.5.x, so getConstructors()[0] and the widest constructor resolve to the same one. The change is purely defensive. Same fix has already landed in the Spark 4 override in PR apache#38255 (commit 9c071c5, flagged by Gemini Code Assist round 1). Porting it to the shared base keeps both code paths consistent.
|
@Abacn deferring the review to you since you've started (if you haven't built much context or don't have bandwidth, let me know and I'll take it) |
…oDynamicWithStrictTimeout Wall-clock-timing test (100ms inter-message + 150ms strict batch timeout) in sdks/java/io/amazon-web-services2 SQS — unrelated to this PR (no AWS2/SQS/Direct-runner files touched), and master is green for the same PreCommit on 6106b30.
`Java Wordcount Direct Runner (windows-latest)` failed at the :buildSrc configure step with HTTP 403 fetching legacy Spotless 5.6.1 transitive deps from repo.maven.apache.org (spotless-lib:2.7.0, durian-*:1.2.0, jgit:5.8.0). Network/infra flake — PR doesn't touch examples or buildSrc, master 'Java Tests' workflow consistently green.
Both checks failed on the prior empty retry commit (e19b80c). Reproduced locally at e19b80c: spotlessCheck and Spark checkStyleMain/Test all pass. PR doesn't touch any GCP IO code, and both checks were green on the immediately preceding branch commits (5abbb21, 604037f) and on master (6106b30, e01f711). Treating as infra flakes; squash before merge.
|
Two PreCommit failures on Reasoning:
Squash-me marker: |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #38255 +/- ##
============================================
- Coverage 58.51% 57.59% -0.92%
+ Complexity 15437 13198 -2239
============================================
Files 2853 2597 -256
Lines 280125 266204 -13921
Branches 12343 10952 -1391
============================================
- Hits 163913 153321 -10592
+ Misses 109789 107111 -2678
+ Partials 6423 5772 -651
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Addresses #36841. Replaces #38212.
Builds on the shared base + per-version overrides plumbing introduced in #38233 (merged). With that in place, Spark 4 support is reduced to:
runners/spark/4/src/(no duplication of files that match the Spark 3 baseline)runners/spark/4/build.gradle, job-server module, container Dockerfilerunners/spark/src/gradle.properties:spark_versions=3,4The diff is smaller compared to #38212, which predated the refactor and duplicated the entire Spark 3 source tree.
cc @Abacn — this is the slim follow-up we discussed.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.