[lake/hudi] Introduce Hudi compaction in tiering service#3518
Conversation
There was a problem hiding this comment.
Pull request overview
This PR completes the Hudi MOR tiering lifecycle by adding compaction support to the lake tiering writer/committer flow, and updating source-side timeline/split planning to safely handle compaction-related file slices and instants.
Changes:
- Introduce
HudiCompactionServiceto schedule, mark inflight, execute, commit, and rollback Hudi MOR compactions. - Extend
HudiLakeWriterto execute inflight compactions asynchronously and attach compaction write stats to the write result; extendHudiLakeCommitterto commit compaction results and schedule the next compaction round. - Update timeline/split planning and tests to account for completed compaction instants and to skip empty MOR file slices.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java | Updates commit validation test to reflect new write-stats expectations. |
| fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java | Enables auto-compaction in the IT case and validates compaction completion across tiering rounds. |
| fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java | Adds compaction assertions, improves MOR file-slice selection, and skips empty slices in reads. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java | Includes completed compaction instants in the “completed timeline” view. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java | Executes inflight compaction asynchronously for MOR when auto-compaction is enabled. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java | Commits compaction results, schedules next compaction, and rolls back compaction instants on abort. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java | New service implementing compaction scheduling/inflight transition/execution/commit/rollback. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java | Skips empty MOR file slices during split planning. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
luoyuxia
left a comment
There was a problem hiding this comment.
@XuQianJin-Stars Could you please help review this pr?
| .getLatestMergedFileSlicesBeforeOrOn(partitionPath, snapshotTime) | ||
| .collect(Collectors.toList()); | ||
| for (FileSlice fileSlice : fileSlices) { | ||
| if (fileSlice.isEmpty()) { |
There was a problem hiding this comment.
Skipping empty file slices is correct for MOR, but please add a short comment explaining why we can encounter an empty slice here (e.g. a pending compaction has scheduled a new file group but no base file has been written yet). Without that context the next reader will likely wonder if this is hiding a bug.
| e); | ||
| } | ||
| }) | ||
| .filter(pair -> isValidCompactionPlan(pair.getRight())) |
There was a problem hiding this comment.
Plans that fail isValidCompactionPlan are filtered out here, but at this point markSelectedCompactionsInflight has already transitioned the corresponding requested instants to inflight. Nothing in the codebase rolls those inflight instants back, which leaves them as orphan inflight compactions on the timeline forever and will block future compaction scheduling on the same file groups.
Either (a) validate the plan before marking it inflight, or (b) explicitly rollback invalid plans here via table.rollbackInflightCompaction(...) before filtering them out.
| CompactionOperation compactionOperation = | ||
| CompactionOperation.convertFromAvroRecordInstance(operation); | ||
|
|
||
| metaClient.reload(); |
There was a problem hiding this comment.
metaClient.reload() (and setAvroSchema on the next line) are called once per compaction operation. A single plan typically contains many operations across the same file groups, so reloading the metaClient inside the inner loop is wasted work. Move reload() and setAvroSchema() out to the per-plan loop (around line 219), they only need to run once per instant.
| createReaderContext(true), | ||
| table); | ||
| writeStatusesByInstant | ||
| .computeIfAbsent(instantTime, ignored -> new ArrayList<>()) |
There was a problem hiding this comment.
If compactor.compact(...) fails partway through, the writeStatuses already added to writeStatusesByInstant for prior operations of the same instant are silently dropped (we throw and the map never reaches commitCompaction), but the inflight instant on the timeline is not rolled back here. That can leave half-written log/base files on storage and an inflight instant that will be re-attempted later with stale data.
Please either roll back the inflight instant in a catch/finally (via CompactionUtil.rollbackCompaction(table, instantTime, txnManager)) before rethrowing, or document very clearly that recovery is delegated to the next round (and add an integration test that exercises it).
| for (String compactionInstant : compactionInstants) { | ||
| HudiWriteStats writeStats = compactionWriteStats.get(compactionInstant); | ||
|
|
||
| if (writeStats.getTotalErrorRecords() > 0 && !conf.get(FlinkOptions.IGNORE_FAILED)) { |
There was a problem hiding this comment.
throw new IOException(...) here will reach HudiLakeCommitter#commitCompactionAndSchedule (line 228) which only logs a WARN and continues. The error-record check therefore has no real effect on the main commit. Either:
- propagate the failure all the way up so the data commit also fails (preferred), or
- remove this
throwand only roll back, since the current behaviour is misleading.
Otherwise users running withIGNORE_FAILED=falsewill silently get partial compaction results.
| metaClient.getCommitsTimeline().filterCompletedInstants(); | ||
| metaClient | ||
| .getCommitsAndCompactionTimeline() | ||
| .filterCompletedAndCompactionInstants(); |
There was a problem hiding this comment.
The semantics of completedTimeline have been silently changed from 'completed commits' to 'completed commits + any compaction state'. HudiSplitPlanner.java:62 uses getCompletedTimeline().containsInstant(snapshotTime) to decide whether a snapshot is readable. With the new definition, an inflight or requested compaction instant will be reported as readable, which can lead a reader to plan splits over partial state.
Please keep completedTimeline strictly completed-only and introduce a separate accessor (e.g. getFileSystemViewTimeline()) for the file-system view that needs the broader timeline. The two consumers have different correctness requirements.
| @@ -187,6 +189,7 @@ protected long createPkTable( | |||
|
|
|||
| if (enableAutoCompaction) { | |||
| tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true"); | |||
There was a problem hiding this comment.
"hudi." is repeated in several places in this PR. Please extract a constant (the same prefix is implicitly defined elsewhere when Fluss forwards table options into Hudi config; reusing the existing constant would prevent drift).
| isCompactionInstant( | ||
| timeline, instant))) | ||
| .isTrue(); | ||
| } |
There was a problem hiding this comment.
A 1-minute retry timeout on checkHudiCompactionCommitted is generous; please reduce to 30s or expose as a constant. Also, isCompactionInstant reads commit metadata and may throw IOException; if it does, the current implementation propagates a RuntimeException and fails the retry. For a polling helper it would be more lenient to return false on read errors and let the next poll succeed.
| return hudiTableInfo | ||
| .getFileSystemView() | ||
| .getLatestFileSlicesBeforeOrOn(partition, latestInstantTime, true) | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
The MOR vs COW branches use different APIs (getLatestMergedFileSlicesBeforeOrOn vs getLatestFileSlicesBeforeOrOn(... , true)) - please add a one-line comment explaining why includeFileSliceBefore=true is needed for COW so future readers do not flip it back.
| FlinkHudiTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig()); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
Flipping enableAutoCompaction from false to true for the existing PK-tiering test removes coverage of the non-compaction path. Please keep the original test (auto-compaction off) and add a new test method for auto-compaction on; otherwise we lose a regression baseline.
…ing to review results by XuQianJin-Stars
|
@XuQianJin-Stars Do you still have comments? |
nothing |
luoyuxia
left a comment
There was a problem hiding this comment.
@fhan688 @XuQianJin-Stars Thanks all. Merging..
Purpose
Linked issue: #3283
This PR completes the Hudi MOR tiering path by supporting Hudi compaction in the lake tiering service.
Before this change, Hudi tiering could write and commit data files/log files, but compaction write stats were not supported by the committer. As a result,
table.datalake.auto-compactioncould not form a complete schedule -> execute -> commit lifecycle for Hudi MOR tables.Brief change log
HudiCompactionServiceto coordinate Hudi compaction scheduling, inflight transition, bucket/partition-scoped execution,commit, and rollback.
HudiLakeWriterto asynchronously execute inflight compaction for MOR tables when data lake auto compaction is enabled.HudiLakeCommitterto commit compaction results after normal Hudi commit, schedule the next compaction round, and rollback compaction instants on abort.Tests
mvn -pl fluss-lake/fluss-lake-hudi -Dcheckstyle.skip=true spotless:applygit diff --checkmvn -pl fluss-lake/fluss-lake-hudi -am -DskipITs=false -Dcheckstyle.skip=true -DfailIfNoTests=false -Dtest=HudiTieringTest,HudiTieringITCase testAPI and Format
No public API changes.
No Fluss storage format changes.
This PR uses the existing data lake auto-compaction configuration path and Hudi MOR compaction timeline semantics.
Documentation
This introduces functional support for Hudi MOR auto compaction in tiering service. No new configuration option is added.