Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ private List<HudiSplit> planPartition(
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, snapshotTime)
.collect(Collectors.toList());
for (FileSlice fileSlice : fileSlices) {
// Pending MOR compaction plans can expose empty file slices; they are not readable
// at the requested completed instant.
if (fileSlice.isEmpty()) {

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping empty file slices is correct for MOR, but please add a short comment explaining why we can encounter an empty slice here (e.g. a pending compaction has scheduled a new file group but no base file has been written yet). Without that context the next reader will likely wonder if this is hiding a bug.

continue;
}
splits.add(toHudiSplit(hudiTableInfo, partitionPath, fileSlice));
}
return splits;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,26 @@

import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.util.CompactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;

Expand All @@ -58,6 +60,7 @@ public class HudiLakeCommitter implements LakeCommitter<HudiWriteResult, HudiCom
private final HudiWriteTableInfo hudiTableInfo;
private final HoodieFlinkWriteClient<?> writeClient;
private final CkpMetadata ckpMetadata;
private final HudiCompactionService compactionService;

public HudiLakeCommitter(
HudiCatalogProvider hudiCatalogProvider,
Expand All @@ -67,10 +70,21 @@ public HudiLakeCommitter(
this.hudiTableInfo = HudiWriteTableInfo.create(hudiCatalogProvider, tablePath);
this.writeClient = hudiTableInfo.getWriteClient();
this.ckpMetadata = ckpMetadataProvider.get(tablePath, hudiTableInfo);
this.compactionService = HudiCompactionService.forScheduler(hudiTableInfo);
LOG.info(
"Created HudiLakeCommitter with configuration {}.", hudiTableInfo.getFlinkConfig());
}

HudiLakeCommitter(
HudiWriteTableInfo hudiTableInfo,
CkpMetadata ckpMetadata,
HudiCompactionService compactionService) {
this.hudiTableInfo = hudiTableInfo;
this.writeClient = hudiTableInfo.getWriteClient();
this.ckpMetadata = ckpMetadata;
this.compactionService = compactionService;
}

@Override
public HudiCommittable toCommittable(List<HudiWriteResult> hudiWriteResults) {
HudiCommittable.Builder committableBuilder = HudiCommittable.builder();
Expand All @@ -85,8 +99,6 @@ public HudiCommittable toCommittable(List<HudiWriteResult> hudiWriteResults) {
public LakeCommitResult commit(
HudiCommittable committable, Map<String, String> snapshotProperties)
throws IOException {
ensureNoCompactionWriteStats(committable);

Map<String, HudiWriteStats> writeStatsByInstant = committable.getWriteStats();
if (writeStatsByInstant.size() != 1) {
throw new IOException(
Expand Down Expand Up @@ -129,6 +141,7 @@ public LakeCommitResult commit(

ckpMetadata.commitInstant(instant);
LOG.info("Committed Hudi instant {} successfully.", instant);
commitCompactionAndSchedule(committable.getCompactionWriteStats());
return LakeCommitResult.committedIsReadable(parseSnapshotId(instant));
} catch (Exception e) {
if (e instanceof IOException) {
Expand All @@ -140,10 +153,21 @@ public LakeCommitResult commit(

@Override
public void abort(HudiCommittable committable) throws IOException {
Set<String> instants = new LinkedHashSet<>(committable.getWriteStats().keySet());
instants.addAll(committable.getCompactionWriteStats().keySet());
IOException failure = null;
for (String instant : instants) {
for (String instant : committable.getCompactionWriteStats().keySet()) {
try {
abortCompactionInstant(instant);
LOG.info("Aborted Hudi compaction instant {}.", instant);
} catch (IOException e) {
failure =
addSuppressed(
failure,
new IOException(
"Failed to abort Hudi compaction instant " + instant + ".",
e));
}
}
for (String instant : committable.getWriteStats().keySet()) {
try {
abortInstant(instant);
LOG.info("Aborted Hudi instant {}.", instant);
Expand All @@ -160,17 +184,6 @@ public void abort(HudiCommittable committable) throws IOException {
}
}

private static void ensureNoCompactionWriteStats(HudiCommittable committable)
throws IOException {
Map<String, HudiWriteStats> compactionWriteStats = committable.getCompactionWriteStats();
if (!compactionWriteStats.isEmpty()) {
throw new IOException(
"Hudi compaction write stats are not supported yet, but got instants "
+ compactionWriteStats.keySet()
+ ".");
}
}

@Nullable
@Override
public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSnapshotIdOfFluss)
Expand Down Expand Up @@ -207,6 +220,34 @@ public void close() throws Exception {
IOUtils.closeQuietly(hudiTableInfo, "hudi table info");
}

private void commitCompactionAndSchedule(Map<String, HudiWriteStats> compactionWriteStats)
throws IOException {
if (!isAutoCompactionEnabled()) {
return;
}

String latestCommittedInstant =
compactionService.commitCompaction(compactionWriteStats, Collections.emptyMap());
if (latestCommittedInstant != null) {
LOG.info("Committed Hudi compaction instant {}.", latestCommittedInstant);
}

try {
compactionService.scheduleCompaction();
compactionService.markSelectedCompactionsInflight();
} catch (Exception e) {
LOG.warn(
"Failed to schedule Hudi compaction for table {}.",
hudiTableInfo.getTablePath(),
e);
}
}

private boolean isAutoCompactionEnabled() {
return hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ
&& hudiTableInfo.getFlinkConfig().get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses Hudi's own FlinkOptions.COMPACTION_SCHEDULE_ENABLED whereas HudiLakeWriter#shouldRunCompaction (line 174) uses Fluss-side tableInfo.getTableConfig().isDataLakeAutoCompaction(). The two switches must agree, otherwise the writer can run compaction execution and produce write stats that the committer then refuses to commit (or vice versa).

Please unify on a single source of truth. Easiest path: in HudiWriteTableInfo.create, set COMPACTION_SCHEDULE_ENABLED from tableConfig.isDataLakeAutoCompaction(); or in this method also check the Fluss flag.

}

private void validateWriteStats(String instant, HudiWriteStats writeStats) {
long totalErrorRecords = writeStats.getTotalErrorRecords();
if (totalErrorRecords > 0
Expand Down Expand Up @@ -250,6 +291,16 @@ private void abortInstant(String instant) throws IOException {
}
}

private void abortCompactionInstant(String instant) throws IOException {
try {
hudiTableInfo.getMetaClient().reloadActiveTimeline();
CompactionUtil.rollbackCompaction(

@XuQianJin-Stars XuQianJin-Stars Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeClient.getHoodieTable() is cached against the last refresh; if abort is invoked before any reloadActiveTimeline() since the inflight compaction was created, the rollback target may be stale. Add hudiTableInfo.getMetaClient().reloadActiveTimeline() immediately before this call (similar to what getCompletedTimelineCommittedBy does).

writeClient.getHoodieTable(), instant, writeClient.getTransactionManager());
} catch (Exception e) {
throw new IOException("Failed to rollback Hudi compaction instant " + instant + ".", e);
}
}

private HoodieTimeline getCompletedTimelineCommittedBy(String commitUser) throws IOException {
hudiTableInfo.getMetaClient().reloadActiveTimeline();
HoodieTimeline timeline =
Expand All @@ -274,7 +325,9 @@ private static boolean isCommittedBy(
throw new IOException("Failed to load committed Hudi instant metadata.");
}
Map<String, String> extraMetadata = metadata.getExtraMetadata();
return extraMetadata != null && commitUser.equals(extraMetadata.get(COMMITTER_USER));
return metadata.getOperationType() != WriteOperationType.COMPACT
&& extraMetadata != null
&& commitUser.equals(extraMetadata.get(COMMITTER_USER));
} catch (IOException e) {
// a read failure must not be silently treated as "not committed by Fluss",
// otherwise we may miss an already-tiered snapshot and re-commit duplicated data
Expand Down
Loading