Skip to content

[lake/hudi] Introduce Hudi compaction in tiering service#3518

Merged
luoyuxia merged 3 commits into
apache:mainfrom
fhan688:Introduce-Hudi-Compaction-Service
Jun 24, 2026
Merged

[lake/hudi] Introduce Hudi compaction in tiering service#3518
luoyuxia merged 3 commits into
apache:mainfrom
fhan688:Introduce-Hudi-Compaction-Service

Conversation

@fhan688

@fhan688 fhan688 commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Purpose

Linked issue: #3283

This PR completes the Hudi MOR tiering path by supporting Hudi compaction in the lake tiering service.

Before this change, Hudi tiering could write and commit data files/log files, but compaction write stats were not supported by the committer. As a result, table.datalake.auto-compaction could not form a complete schedule -> execute -> commit lifecycle for Hudi MOR tables.

Brief change log

  • Add HudiCompactionService to coordinate Hudi compaction scheduling, inflight transition, bucket/partition-scoped execution,
    commit, and rollback.
  • Extend HudiLakeWriter to asynchronously execute inflight compaction for MOR tables when data lake auto compaction is enabled.
  • Extend HudiLakeCommitter to commit compaction results after normal Hudi commit, schedule the next compaction round, and rollback compaction instants on abort.
  • Update Hudi source-side timeline handling to include completed compaction instants, so committed compaction snapshots can be discovered and read.
  • Skip empty MOR file slices during split planning/verification to handle pending or compacted file groups safely.
  • Extend Hudi tiering tests to cover the full compaction lifecycle across tiering rounds.

Tests

  • mvn -pl fluss-lake/fluss-lake-hudi -Dcheckstyle.skip=true spotless:apply
  • git diff --check
  • mvn -pl fluss-lake/fluss-lake-hudi -am -DskipITs=false -Dcheckstyle.skip=true -DfailIfNoTests=false -Dtest=HudiTieringTest,HudiTieringITCase test

API and Format

No public API changes.

No Fluss storage format changes.

This PR uses the existing data lake auto-compaction configuration path and Hudi MOR compaction timeline semantics.

Documentation

This introduces functional support for Hudi MOR auto compaction in tiering service. No new configuration option is added.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR completes the Hudi MOR tiering lifecycle by adding compaction support to the lake tiering writer/committer flow, and updating source-side timeline/split planning to safely handle compaction-related file slices and instants.

Changes:

  • Introduce HudiCompactionService to schedule, mark inflight, execute, commit, and rollback Hudi MOR compactions.
  • Extend HudiLakeWriter to execute inflight compactions asynchronously and attach compaction write stats to the write result; extend HudiLakeCommitter to commit compaction results and schedule the next compaction round.
  • Update timeline/split planning and tests to account for completed compaction instants and to skip empty MOR file slices.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java Updates commit validation test to reflect new write-stats expectations.
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java Enables auto-compaction in the IT case and validates compaction completion across tiering rounds.
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java Adds compaction assertions, improves MOR file-slice selection, and skips empty slices in reads.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java Includes completed compaction instants in the “completed timeline” view.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java Executes inflight compaction asynchronously for MOR when auto-compaction is enabled.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java Commits compaction results, schedules next compaction, and rolls back compaction instants on abort.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java New service implementing compaction scheduling/inflight transition/execution/commit/rollback.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java Skips empty MOR file slices during split planning.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@luoyuxia luoyuxia left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@XuQianJin-Stars Could you please help review this pr?

@XuQianJin-Stars XuQianJin-Stars left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding MOR auto-compaction support. Overall the architecture (async execute on writer + schedule/commit on committer) is reasonable

.getLatestMergedFileSlicesBeforeOrOn(partitionPath, snapshotTime)
.collect(Collectors.toList());
for (FileSlice fileSlice : fileSlices) {
if (fileSlice.isEmpty()) {

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping empty file slices is correct for MOR, but please add a short comment explaining why we can encounter an empty slice here (e.g. a pending compaction has scheduled a new file group but no base file has been written yet). Without that context the next reader will likely wonder if this is hiding a bug.

e);
}
})
.filter(pair -> isValidCompactionPlan(pair.getRight()))

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plans that fail isValidCompactionPlan are filtered out here, but at this point markSelectedCompactionsInflight has already transitioned the corresponding requested instants to inflight. Nothing in the codebase rolls those inflight instants back, which leaves them as orphan inflight compactions on the timeline forever and will block future compaction scheduling on the same file groups.

Either (a) validate the plan before marking it inflight, or (b) explicitly rollback invalid plans here via table.rollbackInflightCompaction(...) before filtering them out.

CompactionOperation compactionOperation =
CompactionOperation.convertFromAvroRecordInstance(operation);

metaClient.reload();

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metaClient.reload() (and setAvroSchema on the next line) are called once per compaction operation. A single plan typically contains many operations across the same file groups, so reloading the metaClient inside the inner loop is wasted work. Move reload() and setAvroSchema() out to the per-plan loop (around line 219), they only need to run once per instant.

createReaderContext(true),
table);
writeStatusesByInstant
.computeIfAbsent(instantTime, ignored -> new ArrayList<>())

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If compactor.compact(...) fails partway through, the writeStatuses already added to writeStatusesByInstant for prior operations of the same instant are silently dropped (we throw and the map never reaches commitCompaction), but the inflight instant on the timeline is not rolled back here. That can leave half-written log/base files on storage and an inflight instant that will be re-attempted later with stale data.

Please either roll back the inflight instant in a catch/finally (via CompactionUtil.rollbackCompaction(table, instantTime, txnManager)) before rethrowing, or document very clearly that recovery is delegated to the next round (and add an integration test that exercises it).

for (String compactionInstant : compactionInstants) {
HudiWriteStats writeStats = compactionWriteStats.get(compactionInstant);

if (writeStats.getTotalErrorRecords() > 0 && !conf.get(FlinkOptions.IGNORE_FAILED)) {

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new IOException(...) here will reach HudiLakeCommitter#commitCompactionAndSchedule (line 228) which only logs a WARN and continues. The error-record check therefore has no real effect on the main commit. Either:

  • propagate the failure all the way up so the data commit also fails (preferred), or
  • remove this throw and only roll back, since the current behaviour is misleading.
    Otherwise users running with IGNORE_FAILED=false will silently get partial compaction results.

metaClient.getCommitsTimeline().filterCompletedInstants();
metaClient
.getCommitsAndCompactionTimeline()
.filterCompletedAndCompactionInstants();

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of completedTimeline have been silently changed from 'completed commits' to 'completed commits + any compaction state'. HudiSplitPlanner.java:62 uses getCompletedTimeline().containsInstant(snapshotTime) to decide whether a snapshot is readable. With the new definition, an inflight or requested compaction instant will be reported as readable, which can lead a reader to plan splits over partial state.

Please keep completedTimeline strictly completed-only and introduce a separate accessor (e.g. getFileSystemViewTimeline()) for the file-system view that needs the broader timeline. The two consumers have different correctness requirements.

@@ -187,6 +189,7 @@ protected long createPkTable(

if (enableAutoCompaction) {
tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true");

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"hudi." is repeated in several places in this PR. Please extract a constant (the same prefix is implicitly defined elsewhere when Fluss forwards table options into Hudi config; reusing the existing constant would prevent drift).

isCompactionInstant(
timeline, instant)))
.isTrue();
}

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A 1-minute retry timeout on checkHudiCompactionCommitted is generous; please reduce to 30s or expose as a constant. Also, isCompactionInstant reads commit metadata and may throw IOException; if it does, the current implementation propagates a RuntimeException and fails the retry. For a polling helper it would be more lenient to return false on read errors and let the next poll succeed.

return hudiTableInfo
.getFileSystemView()
.getLatestFileSlicesBeforeOrOn(partition, latestInstantTime, true)
.collect(Collectors.toList());

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MOR vs COW branches use different APIs (getLatestMergedFileSlicesBeforeOrOn vs getLatestFileSlicesBeforeOrOn(... , true)) - please add a one-line comment explaining why includeFileSliceBefore=true is needed for COW so future readers do not flip it back.

FlinkHudiTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
}

@Test

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flipping enableAutoCompaction from false to true for the existing PK-tiering test removes coverage of the non-compaction path. Please keep the original test (auto-compaction off) and add a new test method for auto-compaction on; otherwise we lose a regression baseline.

@luoyuxia

Copy link
Copy Markdown
Contributor

@XuQianJin-Stars Do you still have comments?

@XuQianJin-Stars

Copy link
Copy Markdown
Contributor

@XuQianJin-Stars Do you still have comments?

nothing

@XuQianJin-Stars XuQianJin-Stars left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@luoyuxia luoyuxia left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fhan688 @XuQianJin-Stars Thanks all. Merging..

@luoyuxia luoyuxia merged commit 79b648c into apache:main Jun 24, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants