[AURON #2316] Support native scan for Paimon DSv2 (BatchScanExec) COW tables#2313
[AURON #2316] Support native scan for Paimon DSv2 (BatchScanExec) COW tables#2313zhuxiangyi wants to merge 3 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR extends Auron’s native scan acceleration for Apache Paimon from the existing Hive-registered table path to DSv2 catalogs by detecting Paimon DSv2 BatchScanExec scans, building a native file-scan plan (Parquet/ORC COW only), and executing via a new native leaf scan exec. It also adds an integration suite covering DSv2 behavior.
Changes:
- Add
PaimonScanSupportto detect/plan DSv2 Paimon scans (COW + Parquet/ORC + supported types) and extractDataSplit/data files + partition rows. - Introduce
NativePaimonV2TableScanExecto execute planned DSv2 scans using existing native file scan protobuf nodes and metrics wiring. - Extend
PaimonConvertProviderto convert DSv2BatchScanExecto the new native exec, and add DSv2 integration tests + test dependencies.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/BaseAuronPaimonSuite.scala | Adds shared Spark/Paimon DSv2 catalog test session configuration and warehouse cleanup. |
| thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala | Adds integration coverage for DSv2 Paimon native scans (projection/partitioning/ORC/metrics/fallbacks). |
| thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala | Adds DSv2 BatchScanExec conversion path using PaimonScanSupport and NativePaimonV2TableScanExec. |
| thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePaimonV2TableScanExec.scala | Implements the native DSv2 Paimon table scan exec using the existing native file scan infrastructure. |
| thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala | Adds DSv2 planning support: reflection-based Paimon scan detection, split extraction, and file/partition reconstruction. |
| thirdparty/auron-paimon/pom.xml | Adds Paimon Spark bundle + Spark test-jars needed for the new DSv2 integration tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Project the read schema onto the (fileSchema ++ partitionSchema) layout used by the native scan. | ||
| private lazy val projection: Seq[Integer] = { | ||
| val combined = StructType(fileSchema.fields ++ partitionSchema.fields) | ||
| plan.readSchema.fields.map(f => Integer.valueOf(combined.fieldIndex(f.name))) | ||
| } |
| test("paimon v2 native scan handles empty table") { | ||
| withTable("paimon.db.t_empty") { | ||
| sql("create table paimon.db.t_empty (id int, v string) using paimon") | ||
| val df = sql("select * from paimon.db.t_empty") | ||
| checkAnswer(df, Seq.empty) | ||
| } |
| val partitions = inputPartitions(exec) | ||
| if (partitions.isEmpty) { | ||
| logDebug("Paimon scan planned with empty input partitions.") | ||
| return Some( | ||
| PaimonScanPlan(table, Seq.empty, fileFormat, readSchema, fileSchema, partitionSchema)) | ||
| } |
merrily01
left a comment
There was a problem hiding this comment.
Thanks for the contribute. @zhuxiangyi Could you create an issue to track this and avoid the CI check failure?
|
@merrily01 Filed tracking issue #2316 and renamed the PR title to |
|
Pushed a follow-up commit addressing the three Copilot review comments:
|
Thank you, the CI has been re-triggered. |
Auron currently only matches Paimon tables exposed via HiveTableScanExec. Paimon tables registered through DSv2 catalogs (e.g. `paimon.db.t`) are planned as BatchScanExec and were skipped by the native converter. This change adds a parallel DSv2 path: - PaimonScanSupport: reflection-based detection of PaimonBaseScan / PaimonInputPartition so the integration does not require a hard Maven dependency on paimon-spark across Spark versions. Collects DataSplits, rejects MOR/MOW splits (rawConvertible=false or non-empty deletion files), and builds per-split partition values via RowDataToObjectArrayConverter. - NativePaimonV2TableScanExec: leaf native exec mirroring the Iceberg pattern; supports Parquet/ORC, computes split sizes from filesMaxPartitionBytes, exposes numPartitions/numFiles driver metrics, and returns EmptyNativeRDD for empty inputs. - PaimonConvertProvider: dispatches BatchScanExec to the new path, reusing spark.auron.enable.paimon.scan. Integration tests (AuronPaimonV2IntegrationSuite) cover: simple COW select, projection, partitioned + predicate, ORC format, empty table, driver metrics propagation, disable-flag fallback, and MOR primary-key table fallback.
- NativePaimonV2TableScanExec: resolve projection field index via SQLConf.caseSensitiveAnalysis (mirrors NativeIcebergTableScanExec) and derive projection from output attributes, fixing case-insensitive analysis edge case. - PaimonScanSupport.inputPartitions: distinguish "no partitions" from "failed to obtain partitions"; planning failure now falls back to Spark instead of silently returning a zero-row scan. - AuronPaimonV2IntegrationSuite: empty-table test now asserts the native scan is applied so regressions to Spark fallback are caught.
…erg workflow The Paimon CI workflow previously ran `mvn test -pl ... -am`, which also executed tests for upstream dependency modules. After adding spark-extension-shims-spark as a test dependency, the shims module's tests were pulled into the Paimon CI run and failed because auron-build-info.properties was not generated in that build context. Split the step into two phases (matching the Iceberg CI pattern): 1. `mvn install -pl ... -am -DskipTests` to build and install all dependencies (including auron-core which generates the properties file) 2. `mvn test -pl ...` to run only the Paimon module's own tests
519b51d to
b18c515
Compare
|
@merrily01 I've fixed the CI error; could you please help me trigger CI again? |
Which issue does this PR close?
N/A — extends the Paimon native scan support to DSv2 catalogs. Happy to file a tracking issue if preferred.
Rationale for this change
PaimonConvertProvideronly matchesHiveTableScanExectoday, so Paimon tables registered through a DSv2 catalog (e.g.spark.sql.catalog.paimon = org.apache.paimon.spark.SparkCatalog, accessed aspaimon.db.t) fall back to Spark execution even when the underlying layout is a raw-convertible COW Parquet/ORC dataset that Auron can read natively.This PR adds a parallel DSv2 path that reuses the existing native file scan infrastructure (the same protobufs used by
NativeIcebergTableScanExec/ native FileSourceScan) so DSv2 Paimon scans get the same native acceleration as the Hive-registered path.What changes are included in this PR?
PaimonScanSupport— entry point that:org.apache.paimon.spark.PaimonBaseScan/PaimonInputPartition), so the module does not pull a hard Maven dependency on a particularpaimon-spark-*artifact at compile time;FileStoreTable, verifies COW + supported file format (Parquet/ORC) + supported column types;DataSplits from input partitions and rejects splits where!rawConvertible()or deletion files are present (MOR/MOW with positional / equality deletes);InternalRowviaRowDataToObjectArrayConverterso partition columns land in Auron's(fileSchema ++ partitionSchema)layout.NativePaimonV2TableScanExec— leaf native exec built on the existingNativeRDD/NativeSupportsmachinery. MirrorsNativeIcebergTableScanExec:filesMaxPartitionBytes(both Parquet and ORC are splittable here);pb.ParquetScanExecNode/pb.OrcScanExecNode;numPartitions/numFilesdriver metrics and forwardsbytes_scanned/output_rowstoTaskMetrics.inputMetrics;EmptyNativeRDDwhen there are no partitions to read.PaimonConvertProvider— adds aBatchScanExecbranch dispatching to the new path. Reuses the existingspark.auron.enable.paimon.scanconfig so there is no new feature flag.The Hive-registered Paimon path (
NativePaimonTableScanExec) is untouched.Are there any user-facing changes?
Yes — purely additive: when
spark.auron.enable.paimon.scan=true(default), Paimon DSv2 catalog reads on COW Parquet/ORC tables that pass the safety checks above will now be executed natively. Tables that are MOR/MOW or use deletion files continue to fall back to Spark execution (logged at DEBUG with the reason).No new configuration knobs are introduced.
How was this patch tested?
New integration suite
AuronPaimonV2IntegrationSuite(extendsSharedSparkSession, wiresAuronSparkSessionExtension+PaimonSparkSessionExtensions, uses a temp-dir warehouse via thepaimonDSv2 catalog) covers:NativePaimonV2TableScanin executed plan;file.format=orc);EmptyNativeRDDpath returns no rows);numPartitions/numFiles) actually posted viaSparkListenerDriverAccumUpdates;spark.auron.enable.paimon.scan=falsefalls back to Spark;The existing Paimon CI workflow (
.github/workflows/paimon.yml, matrixpaimon-1.2 / spark-3.5 / scala-2.12) picks up the new suite automatically;paimon-spark-${shortSparkVersion}:${paimonVersion}is added as a test-scope dependency (uses the shadedpaimon-spark-3.5:1.2.0bundle published on Maven Central).mvn test-compile -pl thirdparty/auron-paimon -am -Pscala-2.12 -Ppaimon-1.2 -Pspark-3.5 -Preleasebuilds cleanly with scalastyle passing.