From 2882e8e31e0296dc4dbe7d562405dd88e834d990 Mon Sep 17 00:00:00 2001 From: fhan Date: Wed, 24 Jun 2026 10:12:32 +0800 Subject: [PATCH 1/3] [lake/hudi] Introduce Hudi compaction in tiering service --- .../lake/hudi/source/HudiSplitPlanner.java | 3 + .../hudi/tiering/HudiCompactionService.java | 464 ++++++++++++++++++ .../lake/hudi/tiering/HudiLakeCommitter.java | 91 +++- .../lake/hudi/tiering/HudiLakeWriter.java | 105 +++- .../fluss/lake/hudi/utils/HudiTableInfo.java | 4 +- .../testutils/FlinkHudiTieringTestBase.java | 70 ++- .../lake/hudi/tiering/HudiTieringITCase.java | 11 +- .../lake/hudi/tiering/HudiTieringTest.java | 7 +- 8 files changed, 722 insertions(+), 33 deletions(-) create mode 100644 fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java index b57c984b06..8032742cb5 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java @@ -119,6 +119,9 @@ private List planPartition( .getLatestMergedFileSlicesBeforeOrOn(partitionPath, snapshotTime) .collect(Collectors.toList()); for (FileSlice fileSlice : fileSlices) { + if (fileSlice.isEmpty()) { + continue; + } splits.add(toHudiSplit(hudiTableInfo, partitionPath, fileSlice)); } return splits; diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java new file mode 100644 index 0000000000..5c24dd075b --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi.tiering; + +import org.apache.fluss.metadata.TableBucket; + +import org.apache.flink.configuration.Configuration; +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.InstantGenerator; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.action.compact.CompactHelpers; +import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; +import org.apache.hudi.table.format.FlinkRowDataReaderContext; +import org.apache.hudi.table.format.InternalSchemaManager; +import org.apache.hudi.util.CompactionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** Coordinates Hudi compaction scheduling, execution, and commit for tiering writers. */ +public class HudiCompactionService { + + private static final Logger LOG = LoggerFactory.getLogger(HudiCompactionService.class); + + private final HudiWriteTableInfo hudiTableInfo; + private final HoodieFlinkWriteClient writeClient; + private final HoodieTableMetaClient metaClient; + private final HoodieFlinkTable table; + private final Configuration conf; + @Nullable private final TableBucket tableBucket; + @Nullable private final String partition; + + private InternalSchemaManager internalSchemaManager; + + public HudiCompactionService( + HudiWriteTableInfo hudiTableInfo, + @Nullable TableBucket tableBucket, + @Nullable String partition) { + this.hudiTableInfo = checkNotNull(hudiTableInfo, "Hudi write table info must not be null."); + this.writeClient = hudiTableInfo.getWriteClient(); + this.metaClient = hudiTableInfo.getMetaClient(); + this.table = writeClient.getHoodieTable(); + this.conf = hudiTableInfo.getFlinkConfig(); + this.tableBucket = tableBucket; + this.partition = partition; + } + + public boolean scheduleCompaction() { + LOG.info("Scheduling Hudi compaction for table {}.", hudiTableInfo.getTablePath()); + metaClient.reloadActiveTimeline(); + try { + boolean scheduled = writeClient.scheduleCompaction(Option.empty()).isPresent(); + metaClient.reloadActiveTimeline(); + if (!scheduled) { + LOG.info( + "No Hudi compaction plan was scheduled for table {}.", + hudiTableInfo.getTablePath()); + } + return scheduled; + } catch (Exception e) { + LOG.warn( + "Failed to schedule Hudi compaction for table {}.", + hudiTableInfo.getTablePath(), + e); + return false; + } + } + + public void markSelectedCompactionsInflight() { + List compactionInstantTimes = getSelectedCompactionInstantTimes(); + if (compactionInstantTimes.isEmpty()) { + return; + } + + HoodieTimeline pendingCompactionTimeline = + table.getActiveTimeline().filterPendingCompactionTimeline(); + InstantGenerator instantGenerator = table.getInstantGenerator(); + + for (String timestamp : compactionInstantTimes) { + HoodieInstant inflightInstant = + instantGenerator.getCompactionInflightInstant(timestamp); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + LOG.info("Rollback stale inflight Hudi compaction instant {}.", timestamp); + table.rollbackInflightCompaction( + inflightInstant, writeClient.getTransactionManager()); + metaClient.reloadActiveTimeline(); + } + } + + pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + for (String timestamp : compactionInstantTimes) { + HoodieInstant requestedInstant = + instantGenerator.getCompactionRequestedInstant(timestamp); + if (pendingCompactionTimeline.containsInstant(requestedInstant)) { + table.getActiveTimeline().transitionCompactionRequestedToInflight(requestedInstant); + LOG.info("Marked Hudi compaction instant {} as inflight.", timestamp); + } + } + metaClient.reloadActiveTimeline(); + } + + public List getInflightCompactionInstantTimes() { + List compactionInstantTimes = getSelectedCompactionInstantTimes(); + if (compactionInstantTimes.isEmpty()) { + return Collections.emptyList(); + } + + HoodieTimeline pendingCompactionTimeline = + metaClient.getActiveTimeline().filterPendingCompactionTimeline(); + InstantGenerator instantGenerator = table.getInstantGenerator(); + + List inflightCompactionInstantTimes = new ArrayList<>(); + for (String timestamp : compactionInstantTimes) { + HoodieInstant inflightInstant = + instantGenerator.getCompactionInflightInstant(timestamp); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + inflightCompactionInstantTimes.add(inflightInstant.requestedTime()); + } + } + LOG.info( + "Found {} inflight Hudi compaction instants for table {}.", + inflightCompactionInstantTimes.size(), + hudiTableInfo.getTablePath()); + return inflightCompactionInstantTimes; + } + + public List> getCompactionPlans( + List compactionInstantTimes) { + if (compactionInstantTimes == null || compactionInstantTimes.isEmpty()) { + return Collections.emptyList(); + } + + List> compactionPlans = + compactionInstantTimes.stream() + .map( + timestamp -> { + try { + return Pair.of( + timestamp, + CompactionUtils.getCompactionPlan( + metaClient, timestamp)); + } catch (Exception e) { + throw new HoodieException( + "Failed to get Hudi compaction plan " + timestamp, + e); + } + }) + .filter(pair -> isValidCompactionPlan(pair.getRight())) + .collect(Collectors.toList()); + + LOG.info( + "Loaded {} Hudi compaction plans for table {}.", + compactionPlans.size(), + hudiTableInfo.getTablePath()); + return compactionPlans; + } + + public Map> executeCompaction( + List> compactionPlans) throws IOException { + if (compactionPlans == null || compactionPlans.isEmpty()) { + return Collections.emptyMap(); + } + TableBucket currentBucket = + checkNotNull(tableBucket, "Hudi compaction execution requires a table bucket."); + + Map> writeStatusesByInstant = new HashMap<>(); + HoodieWriteConfig writeConfig = writeClient.getConfig(); + String hudiPartitionPath = toHudiPartitionPath(partition); + + for (Pair planPair : compactionPlans) { + String instantTime = planPair.getLeft(); + HoodieCompactionPlan compactionPlan = planPair.getRight(); + for (HoodieCompactionOperation operation : compactionPlan.getOperations()) { + if (!belongsToCurrentWriter(operation, hudiPartitionPath, currentBucket)) { + continue; + } + + HoodieFlinkMergeOnReadTableCompactor compactor = + new HoodieFlinkMergeOnReadTableCompactor<>(); + CompactionOperation compactionOperation = + CompactionOperation.convertFromAvroRecordInstance(operation); + + metaClient.reload(); + try { + CompactionUtil.setAvroSchema(writeConfig, metaClient); + List writeStatuses = + compactor.compact( + writeConfig, + compactionOperation, + instantTime, + table.getTaskContextSupplier(), + createReaderContext(true), + table); + writeStatusesByInstant + .computeIfAbsent(instantTime, ignored -> new ArrayList<>()) + .addAll(writeStatuses); + LOG.info( + "Compacted Hudi file id {} for table {}, partition {}, bucket {}, instant {}.", + operation.getFileId(), + hudiTableInfo.getTablePath(), + hudiPartitionPath, + currentBucket.getBucket(), + instantTime); + } catch (Exception e) { + throw new IOException( + String.format( + "Failed to execute Hudi compaction for table %s, partition %s, bucket %s, instant %s.", + hudiTableInfo.getTablePath(), + hudiPartitionPath, + currentBucket.getBucket(), + instantTime), + e); + } + } + } + return writeStatusesByInstant; + } + + public String commitCompaction( + Map compactionWriteStats, + Map snapshotProperties) + throws IOException { + if (compactionWriteStats == null || compactionWriteStats.isEmpty()) { + return null; + } + + String latestInstant = null; + for (Map.Entry entry : compactionWriteStats.entrySet()) { + String compactionInstant = entry.getKey(); + HudiWriteStats writeStats = entry.getValue(); + + if (writeStats.getTotalErrorRecords() > 0 && !conf.get(FlinkOptions.IGNORE_FAILED)) { + LOG.warn( + "Rollback Hudi compaction instant {} because it contains {} error records.", + compactionInstant, + writeStats.getTotalErrorRecords()); + CompactionUtil.rollbackCompaction( + table, compactionInstant, writeClient.getTransactionManager()); + throw new IOException( + "Failed to commit Hudi compaction instant " + + compactionInstant + + " because it contains " + + writeStats.getTotalErrorRecords() + + " error records."); + } + + doCommitCompaction(compactionInstant, writeStats, snapshotProperties); + if (latestInstant == null || compactionInstant.compareTo(latestInstant) > 0) { + latestInstant = compactionInstant; + } + LOG.info("Committed Hudi compaction instant {}.", compactionInstant); + } + return latestInstant; + } + + private List getSelectedCompactionInstantTimes() { + metaClient.reloadActiveTimeline(); + HoodieTimeline pendingCompactionTimeline = + table.getActiveTimeline().filterPendingCompactionTimeline(); + FlinkCompactionConfig compactionConfig = toFlinkCompactionConfig(conf); + List requestedInstants = + CompactionPlanStrategies.getStrategy(compactionConfig) + .select(pendingCompactionTimeline); + if (requestedInstants.isEmpty()) { + LOG.info( + "No pending Hudi compaction plan found for table {}.", + hudiTableInfo.getTablePath()); + return Collections.emptyList(); + } + return requestedInstants.stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); + } + + private HoodieReaderContext createReaderContext(boolean needReloadMetaClient) { + Supplier internalSchemaManagerSupplier = + () -> { + if (internalSchemaManager == null || needReloadMetaClient) { + internalSchemaManager = + InternalSchemaManager.get(metaClient.getStorageConf(), metaClient); + } + return internalSchemaManager; + }; + StorageConfiguration readerConf = writeClient.getEngineContext().getStorageConf(); + return new FlinkRowDataReaderContext( + readerConf, + internalSchemaManagerSupplier, + Collections.emptyList(), + metaClient.getTableConfig(), + Option.empty()); + } + + private void doCommitCompaction( + String instant, HudiWriteStats writeStats, Map snapshotProperties) + throws IOException { + HoodieCommitMetadata metadata = + CompactHelpers.getInstance() + .createCompactionMetadata( + table, + instant, + HoodieListData.eager(toWriteStatuses(writeStats)), + writeClient.getConfig().getSchema()); + snapshotProperties.forEach(metadata::addMetadata); + + writeClient.completeCompaction(metadata, table, instant); + if (!conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED)) { + writeClient.clean(); + } + metaClient.reloadActiveTimeline(); + } + + private static boolean belongsToCurrentWriter( + HoodieCompactionOperation operation, + String hudiPartitionPath, + TableBucket currentBucket) { + return belongsToCurrentPartition(operation, hudiPartitionPath) + && belongsToCurrentBucket(operation, currentBucket); + } + + private static boolean belongsToCurrentPartition( + HoodieCompactionOperation operation, String hudiPartitionPath) { + String operationPartitionPath = normalizePartitionPath(operation.getPartitionPath()); + return Objects.equals(operationPartitionPath, hudiPartitionPath); + } + + private static boolean belongsToCurrentBucket( + HoodieCompactionOperation operation, TableBucket currentBucket) { + String fileId = operation.getFileId(); + if (fileId == null || fileId.isEmpty()) { + return false; + } + try { + return BucketIdentifier.bucketIdFromFileId(fileId) == currentBucket.getBucket(); + } catch (RuntimeException e) { + return fileId.contains(BucketIdentifier.bucketIdStr(currentBucket.getBucket())); + } + } + + private String toHudiPartitionPath(@Nullable String partitionName) { + if (partitionName == null || partitionName.isEmpty()) { + return ""; + } + + String partitionPath = partitionName.replace('$', '/'); + if (!conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING)) { + return partitionPath; + } + + String partitionFields = conf.get(FlinkOptions.PARTITION_PATH_FIELD); + if (partitionFields == null || partitionFields.trim().isEmpty()) { + return partitionPath; + } + + String[] fields = partitionFields.split(","); + String[] values = partitionName.split("\\$"); + if (fields.length != values.length) { + return partitionPath; + } + + List hiveStylePartitionSegments = new ArrayList<>(fields.length); + for (int i = 0; i < fields.length; i++) { + hiveStylePartitionSegments.add(fields[i].trim() + "=" + values[i]); + } + return String.join("/", hiveStylePartitionSegments); + } + + private static String normalizePartitionPath(@Nullable String partitionPath) { + return partitionPath == null ? "" : partitionPath; + } + + private static List toWriteStatuses(HudiWriteStats writeStats) { + List stats = writeStats.getWriteStats(); + if (stats.isEmpty()) { + return Collections.emptyList(); + } + + List writeStatuses = new ArrayList<>(stats.size()); + for (HoodieWriteStat stat : stats) { + WriteStatus writeStatus = new WriteStatus(); + writeStatus.setStat(stat); + writeStatus.setFileId(stat.getFileId()); + writeStatus.setPartitionPath(stat.getPartitionPath()); + writeStatuses.add(writeStatus); + } + writeStatuses.get(0).setTotalErrorRecords(writeStats.getTotalErrorRecords()); + return writeStatuses; + } + + private static boolean isValidCompactionPlan(HoodieCompactionPlan plan) { + return plan != null && plan.getOperations() != null && !plan.getOperations().isEmpty(); + } + + private static FlinkCompactionConfig toFlinkCompactionConfig(Configuration config) { + FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + + compactionConfig.path = config.get(FlinkOptions.PATH); + compactionConfig.compactionTriggerStrategy = + config.get(FlinkOptions.COMPACTION_TRIGGER_STRATEGY); + compactionConfig.archiveMaxCommits = config.get(FlinkOptions.ARCHIVE_MAX_COMMITS); + compactionConfig.archiveMinCommits = config.get(FlinkOptions.ARCHIVE_MIN_COMMITS); + compactionConfig.cleanPolicy = config.get(FlinkOptions.CLEAN_POLICY); + compactionConfig.cleanRetainCommits = config.get(FlinkOptions.CLEAN_RETAIN_COMMITS); + compactionConfig.cleanRetainHours = config.get(FlinkOptions.CLEAN_RETAIN_HOURS); + compactionConfig.cleanRetainFileVersions = + config.get(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS); + compactionConfig.compactionDeltaCommits = config.get(FlinkOptions.COMPACTION_DELTA_COMMITS); + compactionConfig.compactionDeltaSeconds = config.get(FlinkOptions.COMPACTION_DELTA_SECONDS); + compactionConfig.compactionMaxMemory = config.get(FlinkOptions.COMPACTION_MAX_MEMORY); + compactionConfig.compactionTargetIo = config.get(FlinkOptions.COMPACTION_TARGET_IO); + compactionConfig.compactionTasks = config.get(FlinkOptions.COMPACTION_TASKS); + compactionConfig.cleanAsyncEnable = config.get(FlinkOptions.CLEAN_ASYNC_ENABLED); + compactionConfig.schedule = config.get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); + return compactionConfig; + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java index 44d898e29f..bd6ba0e39c 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java @@ -27,11 +27,13 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.util.CompactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,11 +42,9 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import static org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; @@ -58,6 +58,7 @@ public class HudiLakeCommitter implements LakeCommitter writeClient; private final CkpMetadata ckpMetadata; + private final HudiCompactionService compactionService; public HudiLakeCommitter( HudiCatalogProvider hudiCatalogProvider, @@ -67,6 +68,7 @@ public HudiLakeCommitter( this.hudiTableInfo = HudiWriteTableInfo.create(hudiCatalogProvider, tablePath); this.writeClient = hudiTableInfo.getWriteClient(); this.ckpMetadata = ckpMetadataProvider.get(tablePath, hudiTableInfo); + this.compactionService = new HudiCompactionService(hudiTableInfo, null, null); LOG.info( "Created HudiLakeCommitter with configuration {}.", hudiTableInfo.getFlinkConfig()); } @@ -85,8 +87,6 @@ public HudiCommittable toCommittable(List hudiWriteResults) { public LakeCommitResult commit( HudiCommittable committable, Map snapshotProperties) throws IOException { - ensureNoCompactionWriteStats(committable); - Map writeStatsByInstant = committable.getWriteStats(); if (writeStatsByInstant.size() != 1) { throw new IOException( @@ -129,7 +129,12 @@ public LakeCommitResult commit( ckpMetadata.commitInstant(instant); LOG.info("Committed Hudi instant {} successfully.", instant); - return LakeCommitResult.committedIsReadable(parseSnapshotId(instant)); + String latestCommittedInstant = + commitCompactionAndSchedule( + committable.getCompactionWriteStats(), commitMetadata); + return LakeCommitResult.committedIsReadable( + parseSnapshotId( + latestCommittedInstant == null ? instant : latestCommittedInstant)); } catch (Exception e) { if (e instanceof IOException) { throw (IOException) e; @@ -140,10 +145,8 @@ public LakeCommitResult commit( @Override public void abort(HudiCommittable committable) throws IOException { - Set instants = new LinkedHashSet<>(committable.getWriteStats().keySet()); - instants.addAll(committable.getCompactionWriteStats().keySet()); IOException failure = null; - for (String instant : instants) { + for (String instant : committable.getWriteStats().keySet()) { try { abortInstant(instant); LOG.info("Aborted Hudi instant {}.", instant); @@ -155,22 +158,24 @@ public void abort(HudiCommittable committable) throws IOException { "Failed to abort Hudi instant " + instant + ".", e)); } } + for (String instant : committable.getCompactionWriteStats().keySet()) { + try { + abortCompactionInstant(instant); + LOG.info("Aborted Hudi compaction instant {}.", instant); + } catch (IOException e) { + failure = + addSuppressed( + failure, + new IOException( + "Failed to abort Hudi compaction instant " + instant + ".", + e)); + } + } if (failure != null) { throw failure; } } - private static void ensureNoCompactionWriteStats(HudiCommittable committable) - throws IOException { - Map compactionWriteStats = committable.getCompactionWriteStats(); - if (!compactionWriteStats.isEmpty()) { - throw new IOException( - "Hudi compaction write stats are not supported yet, but got instants " - + compactionWriteStats.keySet() - + "."); - } - } - @Nullable @Override public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSnapshotIdOfFluss) @@ -207,6 +212,45 @@ public void close() throws Exception { IOUtils.closeQuietly(hudiTableInfo, "hudi table info"); } + @Nullable + private String commitCompactionAndSchedule( + Map compactionWriteStats, Map commitMetadata) { + if (!isAutoCompactionEnabled()) { + return null; + } + + String latestCommittedInstant = null; + try { + latestCommittedInstant = + compactionService.commitCompaction(compactionWriteStats, commitMetadata); + if (latestCommittedInstant != null) { + LOG.info("Committed Hudi compaction instant {}.", latestCommittedInstant); + } + } catch (Exception e) { + LOG.warn( + "Failed to commit Hudi compaction for table {}. " + + "The next tiering round can rollback and retry pending compactions.", + hudiTableInfo.getTablePath(), + e); + } + + try { + compactionService.scheduleCompaction(); + compactionService.markSelectedCompactionsInflight(); + } catch (Exception e) { + LOG.warn( + "Failed to schedule Hudi compaction for table {}.", + hudiTableInfo.getTablePath(), + e); + } + return latestCommittedInstant; + } + + private boolean isAutoCompactionEnabled() { + return hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ + && hudiTableInfo.getFlinkConfig().get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); + } + private void validateWriteStats(String instant, HudiWriteStats writeStats) { long totalErrorRecords = writeStats.getTotalErrorRecords(); if (totalErrorRecords > 0 @@ -250,6 +294,15 @@ private void abortInstant(String instant) throws IOException { } } + private void abortCompactionInstant(String instant) throws IOException { + try { + CompactionUtil.rollbackCompaction( + writeClient.getHoodieTable(), instant, writeClient.getTransactionManager()); + } catch (Exception e) { + throw new IOException("Failed to rollback Hudi compaction instant " + instant + ".", e); + } + } + private HoodieTimeline getCompletedTimelineCommittedBy(String commitUser) throws IOException { hudiTableInfo.getMetaClient().reloadActiveTimeline(); HoodieTimeline timeline = diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java index 1b9c869a24..4abf2c1ec4 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java @@ -24,21 +24,30 @@ import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.record.LogRecord; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; import org.apache.flink.configuration.Configuration; +import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.apache.fluss.lake.writer.WriterInitContext.UNKNOWN_SPLIT_INDEX; import static org.apache.fluss.lake.writer.WriterInitContext.UNKNOWN_TIERING_ROUND_TIMESTAMP; @@ -53,6 +62,8 @@ public class HudiLakeWriter implements LakeWriter { private final TableInfo tableInfo; private final HudiWriteTableInfo hudiTableInfo; private final CkpMetadata ckpMetadata; + @Nullable private final ExecutorService compactionExecutor; + @Nullable private final CompletableFuture>> compactionFuture; public HudiLakeWriter( HudiCatalogProvider hudiCatalogProvider, @@ -75,6 +86,16 @@ public HudiLakeWriter( } this.recordWriter = new HudiRecordWriter(writerInitContext, hudiTableInfo, ckpMetadata); + if (shouldRunCompaction(writerInitContext)) { + this.compactionExecutor = + Executors.newSingleThreadExecutor( + new ExecutorThreadFactory( + "hudi-compact-" + writerInitContext.tableBucket())); + this.compactionFuture = executeCompactionAsync(hudiCatalogProvider, writerInitContext); + } else { + this.compactionExecutor = null; + this.compactionFuture = null; + } LOG.info("Created HudiLakeWriter with configuration {}.", hudiTableInfo.getFlinkConfig()); } @@ -91,7 +112,11 @@ public void write(LogRecord record) throws IOException { public HudiWriteResult complete() throws IOException { try { Map> writeStatuses = recordWriter.complete(); - return HudiWriteResult.fromWriteStatuses(writeStatuses, Collections.emptyMap()); + Map> compactionWriteStatuses = Collections.emptyMap(); + if (compactionFuture != null) { + compactionWriteStatuses = compactionFuture.get(); + } + return HudiWriteResult.fromWriteStatuses(writeStatuses, compactionWriteStatuses); } catch (Exception e) { throw new IOException("Failed to complete Hudi write.", e); } @@ -99,14 +124,86 @@ public HudiWriteResult complete() throws IOException { @Override public void close() throws IOException { + IOException failure = closeCompactionExecutor(); + failure = close(failure, recordWriter, "Hudi record writer"); + failure = close(failure, ckpMetadata, "Hudi checkpoint metadata"); + if (failure != null) { + throw failure; + } + } + + @Nullable + private IOException closeCompactionExecutor() { + try { + if (compactionFuture != null && !compactionFuture.isDone()) { + compactionFuture.cancel(true); + } + if (compactionExecutor != null) { + compactionExecutor.shutdown(); + if (!compactionExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.warn("Failed to close Hudi compaction executor."); + } + } + return null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return new IOException("Interrupted while closing Hudi compaction executor.", e); + } catch (Exception e) { + return new IOException("Failed to close Hudi compaction executor.", e); + } + } + + @Nullable + private static IOException close( + @Nullable IOException failure, AutoCloseable closeable, String resourceName) { try { - recordWriter.close(); - ckpMetadata.close(); + closeable.close(); + return failure; } catch (Exception e) { - throw new IOException("Failed to close HudiLakeWriter.", e); + IOException closeException = + new IOException("Failed to close " + resourceName + ".", e); + if (failure == null) { + return closeException; + } + failure.addSuppressed(closeException); + return failure; } } + private boolean shouldRunCompaction(WriterInitContext writerInitContext) { + return writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction() + && hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ; + } + + private CompletableFuture>> executeCompactionAsync( + HudiCatalogProvider hudiCatalogProvider, WriterInitContext writerInitContext) { + return CompletableFuture.supplyAsync( + () -> { + try (HudiWriteTableInfo compactionTableInfo = + HudiWriteTableInfo.create( + hudiCatalogProvider, writerInitContext.tablePath())) { + HudiCompactionService compactionService = + new HudiCompactionService( + compactionTableInfo, + writerInitContext.tableBucket(), + writerInitContext.partition()); + List instantTimes = + compactionService.getInflightCompactionInstantTimes(); + List> compactionPlans = + compactionService.getCompactionPlans(instantTimes); + return compactionService.executeCompaction(compactionPlans); + } catch (Exception e) { + LOG.warn( + "Failed to execute Hudi compaction for table {}, bucket {}.", + writerInitContext.tablePath(), + writerInitContext.tableBucket(), + e); + return Collections.emptyMap(); + } + }, + compactionExecutor); + } + private void initInstant(Configuration configuration, HoodieTableMetaClient metaClient) { metaClient.reloadActiveTimeline(); WriteOperationType writeOperationType = diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java index cb819fab26..72b90d87fc 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java @@ -111,7 +111,9 @@ public static HudiTableInfo create(TablePath tablePath, Configuration hudiConfig HoodieTableFileSystemView fileSystemView = null; HoodieTableMetaClient metaClient = createMetaClient(basePath, hudiConfig); HoodieTimeline completedTimeline = - metaClient.getCommitsTimeline().filterCompletedInstants(); + metaClient + .getCommitsAndCompactionTimeline() + .filterCompletedAndCompactionInstants(); HoodieEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf()); try { diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java index 14892fff03..b33896f22a 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java @@ -49,6 +49,8 @@ import org.apache.flink.table.data.RowData; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -187,6 +189,7 @@ protected long createPkTable( if (enableAutoCompaction) { tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true"); + tableBuilder.customProperty("hudi." + FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); } return createTable(tablePath, tableBuilder.build()); } @@ -205,6 +208,7 @@ protected long createLogTable( if (enableAutoCompaction) { tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true"); + tableBuilder.customProperty("hudi." + FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); } return createTable(tablePath, tableBuilder.build()); } @@ -456,6 +460,30 @@ protected void checkFlussOffsetsInSnapshot( } } + protected void checkHudiCompactionCommitted(TablePath tablePath) { + retry( + Duration.ofMinutes(1), + () -> { + try (HudiTableInfo hudiTableInfo = + HudiTableInfo.create( + tablePath, Configuration.fromMap(getHudiCatalogConf()))) { + HoodieTimeline timeline = + hudiTableInfo + .getMetaClient() + .getActiveTimeline() + .getCommitsAndCompactionTimeline() + .filterCompletedInstants(); + assertThat( + timeline.getInstantsAsStream() + .anyMatch( + instant -> + isCompactionInstant( + timeline, instant))) + .isTrue(); + } + }); + } + private static String formatMORRow(InternalRow row) { return row.getBoolean(0) + "," @@ -492,12 +520,12 @@ private List collectHudiRows( int columnCount = avroSchema.getFields().size(); List fileSlices = - hudiTableInfo - .getFileSystemView() - .getLatestFileSlices(partition) - .collect(Collectors.toList()); + getLatestFileSlicesAtCompletedInstant(hudiTableInfo, partition); List records = new ArrayList<>(); for (FileSlice fileSlice : fileSlices) { + if (fileSlice.isEmpty()) { + continue; + } if (!fileSlice.getFileId().contains(BucketIdentifier.bucketIdStr(bucket))) { continue; } @@ -521,6 +549,31 @@ private List collectHudiRows( } } + private static List getLatestFileSlicesAtCompletedInstant( + HudiTableInfo hudiTableInfo, String partition) { + HoodieTimeline completedTimeline = hudiTableInfo.getCompletedTimeline(); + HoodieInstant latestInstant = + completedTimeline + .lastInstant() + .orElseThrow( + () -> + new IllegalStateException( + "No completed Hudi instant found for table " + + hudiTableInfo.getTablePath() + + ".")); + String latestInstantTime = latestInstant.requestedTime(); + if (hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ) { + return hudiTableInfo + .getFileSystemView() + .getLatestMergedFileSlicesBeforeOrOn(partition, latestInstantTime) + .collect(Collectors.toList()); + } + return hudiTableInfo + .getFileSystemView() + .getLatestFileSlicesBeforeOrOn(partition, latestInstantTime, true) + .collect(Collectors.toList()); + } + private org.apache.flink.configuration.Configuration buildFlinkHudiOptions( TablePath tablePath, HudiTableInfo hudiTableInfo, @@ -533,6 +586,15 @@ private org.apache.flink.configuration.Configuration buildFlinkHudiOptions( return org.apache.flink.configuration.Configuration.fromMap(hudiOptions); } + private static boolean isCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) { + try { + HoodieCommitMetadata metadata = timeline.readCommitMetadata(instant); + return metadata != null && metadata.getOperationType() == WriteOperationType.COMPACT; + } catch (Exception e) { + throw new RuntimeException("Failed to read Hudi instant metadata " + instant + ".", e); + } + } + private interface HudiRowFormatter { String format(RowData row); } diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java index 0f1acc0de7..176d135d19 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java @@ -97,7 +97,7 @@ protected static void beforeAll() { @Test void testTiering() throws Exception { TablePath pkTablePath = TablePath.of(DEFAULT_DB, "pkTable"); - long pkTableId = createPkTable(pkTablePath, 1, false, PK_SCHEMA); + long pkTableId = createPkTable(pkTablePath, 1, true, PK_SCHEMA); TableBucket pkTableBucket = new TableBucket(pkTableId, 0); List rows = Arrays.asList(pkRow(1, "v1"), pkRow(2, "v2"), pkRow(3, "v3")); @@ -121,6 +121,15 @@ void testTiering() throws Exception { checkDataInHudiMORTable(pkTablePath, "", rows, 0); checkFlussOffsetsInSnapshot(pkTablePath, Collections.singletonMap(pkTableBucket, 9L)); + rows = Arrays.asList(pkRow(1, "v1111"), pkRow(2, "v2222"), pkRow(3, "v3333")); + writeRows(pkTablePath, rows, false); + + // 3 current records + 3 delete records + 3 insert records. + assertReplicaStatus(pkTableBucket, 15); + checkDataInHudiMORTable(pkTablePath, "", rows, 0); + checkFlussOffsetsInSnapshot(pkTablePath, Collections.singletonMap(pkTableBucket, 15L)); + checkHudiCompactionCommitted(pkTablePath); + testPartitionedTableTiering(); } finally { jobClient.cancel().get(); diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java index 815daf0695..00a654024e 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java @@ -130,9 +130,8 @@ tablePath, new TableBucket(1L, 0), tableInfo, 0, 0L))) { } @Test - void testRejectCompactionWriteStatusesOnCommit() throws Exception { - TablePath tablePath = - TablePath.of("hudi", "test_reject_compaction_write_statuses_on_commit"); + void testRejectMissingWriteStatsOnCommit() throws Exception { + TablePath tablePath = TablePath.of("hudi", "test_reject_missing_write_stats_on_commit"); TableDescriptor tableDescriptor = createLogTableDescriptor(); hudiLakeCatalog.createTable( tablePath, tableDescriptor, new TestingLakeCatalogContext(tableDescriptor)); @@ -150,7 +149,7 @@ void testRejectCompactionWriteStatusesOnCommit() throws Exception { assertThatThrownBy(() -> committer.commit(committable, Collections.emptyMap())) .isInstanceOf(IOException.class) - .hasMessageContaining("Hudi compaction write stats are not supported yet"); + .hasMessageContaining("Hudi write stats must contain exactly one instant"); } } From 45666c50e2f53afcb19bc7fea9eb1580a5bba1c2 Mon Sep 17 00:00:00 2001 From: fhan Date: Wed, 24 Jun 2026 11:06:41 +0800 Subject: [PATCH 2/3] [lake/hudi] refine code impl according to copilot review comments --- .../lake/hudi/tiering/HudiCompactionService.java | 7 ++++--- .../fluss/lake/hudi/tiering/HudiLakeCommitter.java | 11 +++++++++-- .../fluss/lake/hudi/tiering/HudiTieringTest.java | 14 ++++++++++++++ 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java index 5c24dd075b..daacf9c7ee 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java @@ -274,9 +274,10 @@ public String commitCompaction( } String latestInstant = null; - for (Map.Entry entry : compactionWriteStats.entrySet()) { - String compactionInstant = entry.getKey(); - HudiWriteStats writeStats = entry.getValue(); + List compactionInstants = new ArrayList<>(compactionWriteStats.keySet()); + Collections.sort(compactionInstants); + for (String compactionInstant : compactionInstants) { + HudiWriteStats writeStats = compactionWriteStats.get(compactionInstant); if (writeStats.getTotalErrorRecords() > 0 && !conf.get(FlinkOptions.IGNORE_FAILED)) { LOG.warn( diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java index bd6ba0e39c..07398d02d7 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java @@ -133,8 +133,7 @@ public LakeCommitResult commit( commitCompactionAndSchedule( committable.getCompactionWriteStats(), commitMetadata); return LakeCommitResult.committedIsReadable( - parseSnapshotId( - latestCommittedInstant == null ? instant : latestCommittedInstant)); + parseSnapshotId(getLatestCommittedInstant(instant, latestCommittedInstant))); } catch (Exception e) { if (e instanceof IOException) { throw (IOException) e; @@ -251,6 +250,14 @@ private boolean isAutoCompactionEnabled() { && hudiTableInfo.getFlinkConfig().get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); } + static String getLatestCommittedInstant( + String dataCommitInstant, @Nullable String compactionInstant) { + if (compactionInstant == null || dataCommitInstant.compareTo(compactionInstant) >= 0) { + return dataCommitInstant; + } + return compactionInstant; + } + private void validateWriteStats(String instant, HudiWriteStats writeStats) { long totalErrorRecords = writeStats.getTotalErrorRecords(); if (totalErrorRecords > 0 diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java index 00a654024e..58e6b8f215 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java @@ -153,6 +153,20 @@ void testRejectMissingWriteStatsOnCommit() throws Exception { } } + @Test + void testLatestCommittedInstantNeverGoesBackwards() { + assertThat(HudiLakeCommitter.getLatestCommittedInstant("20260624000200000", null)) + .isEqualTo("20260624000200000"); + assertThat( + HudiLakeCommitter.getLatestCommittedInstant( + "20260624000200000", "20260624000100000")) + .isEqualTo("20260624000200000"); + assertThat( + HudiLakeCommitter.getLatestCommittedInstant( + "20260624000200000", "20260624000300000")) + .isEqualTo("20260624000300000"); + } + private LakeCommitter createLakeCommitter( TablePath tablePath, TableInfo tableInfo) throws IOException { return hudiLakeTieringFactory.createLakeCommitter( From cfac933ab5133d87aa9846137ae497d4cca5ce90 Mon Sep 17 00:00:00 2001 From: fhan Date: Wed, 24 Jun 2026 18:43:13 +0800 Subject: [PATCH 3/3] [lake/hudi] refine entire code impl of tiering service to hudi according to review results by XuQianJin-Stars --- .../lake/hudi/source/HudiSplitPlanner.java | 2 + .../hudi/tiering/HudiCompactionService.java | 159 ++++++++++--- .../lake/hudi/tiering/HudiLakeCommitter.java | 77 +++--- .../lake/hudi/tiering/HudiLakeWriter.java | 125 ++++++++-- .../fluss/lake/hudi/utils/HudiTableInfo.java | 6 +- .../testutils/FlinkHudiTieringTestBase.java | 167 +++++++------ .../lake/hudi/tiering/HudiTieringITCase.java | 42 +++- .../lake/hudi/tiering/HudiTieringTest.java | 219 ++++++++++++++++-- 8 files changed, 602 insertions(+), 195 deletions(-) diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java index 8032742cb5..7bc10e2dfd 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java @@ -119,6 +119,8 @@ private List planPartition( .getLatestMergedFileSlicesBeforeOrOn(partitionPath, snapshotTime) .collect(Collectors.toList()); for (FileSlice fileSlice : fileSlices) { + // Pending MOR compaction plans can expose empty file slices; they are not readable + // at the requested completed instant. if (fileSlice.isEmpty()) { continue; } diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java index daacf9c7ee..f03ad27175 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java @@ -81,7 +81,16 @@ public class HudiCompactionService { private InternalSchemaManager internalSchemaManager; - public HudiCompactionService( + public static HudiCompactionService forScheduler(HudiWriteTableInfo hudiTableInfo) { + return new HudiCompactionService(hudiTableInfo, null, null); + } + + public static HudiCompactionService forExecutor( + HudiWriteTableInfo hudiTableInfo, TableBucket tableBucket, @Nullable String partition) { + return new HudiCompactionService(hudiTableInfo, tableBucket, partition); + } + + private HudiCompactionService( HudiWriteTableInfo hudiTableInfo, @Nullable TableBucket tableBucket, @Nullable String partition) { @@ -94,7 +103,7 @@ public HudiCompactionService( this.partition = partition; } - public boolean scheduleCompaction() { + public boolean scheduleCompaction() throws IOException { LOG.info("Scheduling Hudi compaction for table {}.", hudiTableInfo.getTablePath()); metaClient.reloadActiveTimeline(); try { @@ -107,11 +116,11 @@ public boolean scheduleCompaction() { } return scheduled; } catch (Exception e) { - LOG.warn( - "Failed to schedule Hudi compaction for table {}.", - hudiTableInfo.getTablePath(), + throw new IOException( + "Failed to schedule Hudi compaction for table " + + hudiTableInfo.getTablePath() + + ".", e); - return false; } } @@ -120,6 +129,10 @@ public void markSelectedCompactionsInflight() { if (compactionInstantTimes.isEmpty()) { return; } + compactionInstantTimes = validateSelectedCompactionInstantTimes(compactionInstantTimes); + if (compactionInstantTimes.isEmpty()) { + return; + } HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); @@ -141,6 +154,8 @@ public void markSelectedCompactionsInflight() { HoodieInstant requestedInstant = instantGenerator.getCompactionRequestedInstant(timestamp); if (pendingCompactionTimeline.containsInstant(requestedInstant)) { + // Each Fluss tiering table has a single lake committer, so requested->inflight + // transitions are serialized by the committer task for this table. table.getActiveTimeline().transitionCompactionRequestedToInflight(requestedInstant); LOG.info("Marked Hudi compaction instant {} as inflight.", timestamp); } @@ -219,26 +234,28 @@ public Map> executeCompaction( for (Pair planPair : compactionPlans) { String instantTime = planPair.getLeft(); HoodieCompactionPlan compactionPlan = planPair.getRight(); - for (HoodieCompactionOperation operation : compactionPlan.getOperations()) { - if (!belongsToCurrentWriter(operation, hudiPartitionPath, currentBucket)) { - continue; - } + metaClient.reload(); + try { + CompactionUtil.setAvroSchema(writeConfig, metaClient); + internalSchemaManager = null; + HoodieReaderContext readerContext = createReaderContext(); + for (HoodieCompactionOperation operation : compactionPlan.getOperations()) { + if (!belongsToCurrentWriter(operation, hudiPartitionPath, currentBucket)) { + continue; + } - HoodieFlinkMergeOnReadTableCompactor compactor = - new HoodieFlinkMergeOnReadTableCompactor<>(); - CompactionOperation compactionOperation = - CompactionOperation.convertFromAvroRecordInstance(operation); + HoodieFlinkMergeOnReadTableCompactor compactor = + new HoodieFlinkMergeOnReadTableCompactor<>(); + CompactionOperation compactionOperation = + CompactionOperation.convertFromAvroRecordInstance(operation); - metaClient.reload(); - try { - CompactionUtil.setAvroSchema(writeConfig, metaClient); List writeStatuses = compactor.compact( writeConfig, compactionOperation, instantTime, table.getTaskContextSupplier(), - createReaderContext(true), + readerContext, table); writeStatusesByInstant .computeIfAbsent(instantTime, ignored -> new ArrayList<>()) @@ -250,16 +267,11 @@ public Map> executeCompaction( hudiPartitionPath, currentBucket.getBucket(), instantTime); - } catch (Exception e) { - throw new IOException( - String.format( - "Failed to execute Hudi compaction for table %s, partition %s, bucket %s, instant %s.", - hudiTableInfo.getTablePath(), - hudiPartitionPath, - currentBucket.getBucket(), - instantTime), - e); } + } catch (Exception e) { + writeStatusesByInstant.remove(instantTime); + throw rollbackCompactionAfterFailure( + instantTime, hudiPartitionPath, currentBucket, e); } } return writeStatusesByInstant; @@ -273,9 +285,9 @@ public String commitCompaction( return null; } - String latestInstant = null; List compactionInstants = new ArrayList<>(compactionWriteStats.keySet()); Collections.sort(compactionInstants); + String latestInstant = compactionInstants.get(compactionInstants.size() - 1); for (String compactionInstant : compactionInstants) { HudiWriteStats writeStats = compactionWriteStats.get(compactionInstant); @@ -295,9 +307,6 @@ public String commitCompaction( } doCommitCompaction(compactionInstant, writeStats, snapshotProperties); - if (latestInstant == null || compactionInstant.compareTo(latestInstant) > 0) { - latestInstant = compactionInstant; - } LOG.info("Committed Hudi compaction instant {}.", compactionInstant); } return latestInstant; @@ -322,10 +331,51 @@ private List getSelectedCompactionInstantTimes() { .collect(Collectors.toList()); } - private HoodieReaderContext createReaderContext(boolean needReloadMetaClient) { + private List validateSelectedCompactionInstantTimes( + List compactionInstantTimes) { + HoodieTimeline pendingCompactionTimeline = + table.getActiveTimeline().filterPendingCompactionTimeline(); + InstantGenerator instantGenerator = table.getInstantGenerator(); + List validCompactionInstantTimes = new ArrayList<>(); + for (String timestamp : compactionInstantTimes) { + HoodieCompactionPlan compactionPlan = loadCompactionPlan(timestamp); + if (isValidCompactionPlan(compactionPlan)) { + validCompactionInstantTimes.add(timestamp); + continue; + } + + HoodieInstant inflightInstant = + instantGenerator.getCompactionInflightInstant(timestamp); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + LOG.warn( + "Rollback invalid inflight Hudi compaction instant {} for table {}.", + timestamp, + hudiTableInfo.getTablePath()); + table.rollbackInflightCompaction( + inflightInstant, writeClient.getTransactionManager()); + metaClient.reloadActiveTimeline(); + } else { + LOG.warn( + "Skip invalid Hudi compaction plan {} for table {}.", + timestamp, + hudiTableInfo.getTablePath()); + } + } + return validCompactionInstantTimes; + } + + private HoodieCompactionPlan loadCompactionPlan(String timestamp) { + try { + return CompactionUtils.getCompactionPlan(metaClient, timestamp); + } catch (Exception e) { + throw new HoodieException("Failed to get Hudi compaction plan " + timestamp, e); + } + } + + private HoodieReaderContext createReaderContext() { Supplier internalSchemaManagerSupplier = () -> { - if (internalSchemaManager == null || needReloadMetaClient) { + if (internalSchemaManager == null) { internalSchemaManager = InternalSchemaManager.get(metaClient.getStorageConf(), metaClient); } @@ -340,6 +390,33 @@ private HoodieReaderContext createReaderContext(boolean needReloadMetaClient) Option.empty()); } + private IOException rollbackCompactionAfterFailure( + String instantTime, + String hudiPartitionPath, + TableBucket currentBucket, + Exception cause) { + IOException failure = + new IOException( + String.format( + "Failed to execute Hudi compaction for table %s, partition %s, bucket %s, instant %s.", + hudiTableInfo.getTablePath(), + hudiPartitionPath, + currentBucket.getBucket(), + instantTime), + cause); + try { + CompactionUtil.rollbackCompaction( + table, instantTime, writeClient.getTransactionManager()); + metaClient.reloadActiveTimeline(); + } catch (Exception rollbackFailure) { + failure.addSuppressed( + new IOException( + "Failed to rollback Hudi compaction instant " + instantTime + ".", + rollbackFailure)); + } + return failure; + } + private void doCommitCompaction( String instant, HudiWriteStats writeStats, Map snapshotProperties) throws IOException { @@ -382,11 +459,12 @@ private static boolean belongsToCurrentBucket( try { return BucketIdentifier.bucketIdFromFileId(fileId) == currentBucket.getBucket(); } catch (RuntimeException e) { - return fileId.contains(BucketIdentifier.bucketIdStr(currentBucket.getBucket())); + LOG.warn("Failed to parse Hudi bucket id from file id {}.", fileId, e); + return false; } } - private String toHudiPartitionPath(@Nullable String partitionName) { + private String toHudiPartitionPath(@Nullable String partitionName) throws IOException { if (partitionName == null || partitionName.isEmpty()) { return ""; } @@ -404,7 +482,10 @@ private String toHudiPartitionPath(@Nullable String partitionName) { String[] fields = partitionFields.split(","); String[] values = partitionName.split("\\$"); if (fields.length != values.length) { - return partitionPath; + throw new IOException( + String.format( + "Invalid Fluss partition name '%s' for Hudi hive-style partition fields '%s'. Expected %s values separated by '$' but got %s.", + partitionName, partitionFields, fields.length, values.length)); } List hiveStylePartitionSegments = new ArrayList<>(fields.length); @@ -432,7 +513,6 @@ private static List toWriteStatuses(HudiWriteStats writeStats) { writeStatus.setPartitionPath(stat.getPartitionPath()); writeStatuses.add(writeStatus); } - writeStatuses.get(0).setTotalErrorRecords(writeStats.getTotalErrorRecords()); return writeStatuses; } @@ -440,9 +520,12 @@ private static boolean isValidCompactionPlan(HoodieCompactionPlan plan) { return plan != null && plan.getOperations() != null && !plan.getOperations().isEmpty(); } - private static FlinkCompactionConfig toFlinkCompactionConfig(Configuration config) { + static FlinkCompactionConfig toFlinkCompactionConfig(Configuration config) { FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + // Hudi does not expose a reverse helper for FlinkCompactionConfig. Keep this mapping + // focused on the scheduling and cleaning fields used by Fluss tiering; CLI/service-mode + // fields intentionally keep Hudi defaults. compactionConfig.path = config.get(FlinkOptions.PATH); compactionConfig.compactionTriggerStrategy = config.get(FlinkOptions.COMPACTION_TRIGGER_STRATEGY); diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java index 07398d02d7..47313e4072 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java @@ -28,6 +28,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; @@ -41,6 +42,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -68,11 +70,21 @@ public HudiLakeCommitter( this.hudiTableInfo = HudiWriteTableInfo.create(hudiCatalogProvider, tablePath); this.writeClient = hudiTableInfo.getWriteClient(); this.ckpMetadata = ckpMetadataProvider.get(tablePath, hudiTableInfo); - this.compactionService = new HudiCompactionService(hudiTableInfo, null, null); + this.compactionService = HudiCompactionService.forScheduler(hudiTableInfo); LOG.info( "Created HudiLakeCommitter with configuration {}.", hudiTableInfo.getFlinkConfig()); } + HudiLakeCommitter( + HudiWriteTableInfo hudiTableInfo, + CkpMetadata ckpMetadata, + HudiCompactionService compactionService) { + this.hudiTableInfo = hudiTableInfo; + this.writeClient = hudiTableInfo.getWriteClient(); + this.ckpMetadata = ckpMetadata; + this.compactionService = compactionService; + } + @Override public HudiCommittable toCommittable(List hudiWriteResults) { HudiCommittable.Builder committableBuilder = HudiCommittable.builder(); @@ -129,11 +141,8 @@ public LakeCommitResult commit( ckpMetadata.commitInstant(instant); LOG.info("Committed Hudi instant {} successfully.", instant); - String latestCommittedInstant = - commitCompactionAndSchedule( - committable.getCompactionWriteStats(), commitMetadata); - return LakeCommitResult.committedIsReadable( - parseSnapshotId(getLatestCommittedInstant(instant, latestCommittedInstant))); + commitCompactionAndSchedule(committable.getCompactionWriteStats()); + return LakeCommitResult.committedIsReadable(parseSnapshotId(instant)); } catch (Exception e) { if (e instanceof IOException) { throw (IOException) e; @@ -145,29 +154,29 @@ public LakeCommitResult commit( @Override public void abort(HudiCommittable committable) throws IOException { IOException failure = null; - for (String instant : committable.getWriteStats().keySet()) { + for (String instant : committable.getCompactionWriteStats().keySet()) { try { - abortInstant(instant); - LOG.info("Aborted Hudi instant {}.", instant); + abortCompactionInstant(instant); + LOG.info("Aborted Hudi compaction instant {}.", instant); } catch (IOException e) { failure = addSuppressed( failure, new IOException( - "Failed to abort Hudi instant " + instant + ".", e)); + "Failed to abort Hudi compaction instant " + instant + ".", + e)); } } - for (String instant : committable.getCompactionWriteStats().keySet()) { + for (String instant : committable.getWriteStats().keySet()) { try { - abortCompactionInstant(instant); - LOG.info("Aborted Hudi compaction instant {}.", instant); + abortInstant(instant); + LOG.info("Aborted Hudi instant {}.", instant); } catch (IOException e) { failure = addSuppressed( failure, new IOException( - "Failed to abort Hudi compaction instant " + instant + ".", - e)); + "Failed to abort Hudi instant " + instant + ".", e)); } } if (failure != null) { @@ -211,26 +220,16 @@ public void close() throws Exception { IOUtils.closeQuietly(hudiTableInfo, "hudi table info"); } - @Nullable - private String commitCompactionAndSchedule( - Map compactionWriteStats, Map commitMetadata) { + private void commitCompactionAndSchedule(Map compactionWriteStats) + throws IOException { if (!isAutoCompactionEnabled()) { - return null; + return; } - String latestCommittedInstant = null; - try { - latestCommittedInstant = - compactionService.commitCompaction(compactionWriteStats, commitMetadata); - if (latestCommittedInstant != null) { - LOG.info("Committed Hudi compaction instant {}.", latestCommittedInstant); - } - } catch (Exception e) { - LOG.warn( - "Failed to commit Hudi compaction for table {}. " - + "The next tiering round can rollback and retry pending compactions.", - hudiTableInfo.getTablePath(), - e); + String latestCommittedInstant = + compactionService.commitCompaction(compactionWriteStats, Collections.emptyMap()); + if (latestCommittedInstant != null) { + LOG.info("Committed Hudi compaction instant {}.", latestCommittedInstant); } try { @@ -242,7 +241,6 @@ private String commitCompactionAndSchedule( hudiTableInfo.getTablePath(), e); } - return latestCommittedInstant; } private boolean isAutoCompactionEnabled() { @@ -250,14 +248,6 @@ private boolean isAutoCompactionEnabled() { && hudiTableInfo.getFlinkConfig().get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); } - static String getLatestCommittedInstant( - String dataCommitInstant, @Nullable String compactionInstant) { - if (compactionInstant == null || dataCommitInstant.compareTo(compactionInstant) >= 0) { - return dataCommitInstant; - } - return compactionInstant; - } - private void validateWriteStats(String instant, HudiWriteStats writeStats) { long totalErrorRecords = writeStats.getTotalErrorRecords(); if (totalErrorRecords > 0 @@ -303,6 +293,7 @@ private void abortInstant(String instant) throws IOException { private void abortCompactionInstant(String instant) throws IOException { try { + hudiTableInfo.getMetaClient().reloadActiveTimeline(); CompactionUtil.rollbackCompaction( writeClient.getHoodieTable(), instant, writeClient.getTransactionManager()); } catch (Exception e) { @@ -334,7 +325,9 @@ private static boolean isCommittedBy( throw new IOException("Failed to load committed Hudi instant metadata."); } Map extraMetadata = metadata.getExtraMetadata(); - return extraMetadata != null && commitUser.equals(extraMetadata.get(COMMITTER_USER)); + return metadata.getOperationType() != WriteOperationType.COMPACT + && extraMetadata != null + && commitUser.equals(extraMetadata.get(COMMITTER_USER)); } catch (IOException e) { // a read failure must not be silently treated as "not committed by Fluss", // otherwise we may miss an already-tiered snapshot and re-commit duplicated data diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java index 4abf2c1ec4..c01415e1e1 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java @@ -24,6 +24,7 @@ import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.record.LogRecord; +import org.apache.fluss.utils.TimeUtils; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; import org.apache.flink.configuration.Configuration; @@ -41,13 +42,17 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.fluss.lake.writer.WriterInitContext.UNKNOWN_SPLIT_INDEX; import static org.apache.fluss.lake.writer.WriterInitContext.UNKNOWN_TIERING_ROUND_TIMESTAMP; @@ -58,10 +63,19 @@ public class HudiLakeWriter implements LakeWriter { private static final Logger LOG = LoggerFactory.getLogger(HudiLakeWriter.class); + private static final String COMPACTION_COMPLETE_TIMEOUT_KEY = + "fluss.tiering.compaction.complete-timeout"; + private static final String COMPACTION_SHUTDOWN_TIMEOUT_KEY = + "fluss.tiering.compaction.shutdown-timeout"; + private static final Duration DEFAULT_COMPACTION_COMPLETE_TIMEOUT = Duration.ofMinutes(30); + private static final Duration DEFAULT_COMPACTION_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30); + private final RecordWriter recordWriter; private final TableInfo tableInfo; private final HudiWriteTableInfo hudiTableInfo; private final CkpMetadata ckpMetadata; + private final long compactionCompleteTimeoutMs; + private final long compactionShutdownTimeoutMs; @Nullable private final ExecutorService compactionExecutor; @Nullable private final CompletableFuture>> compactionFuture; @@ -75,6 +89,16 @@ public HudiLakeWriter( this.hudiTableInfo = HudiWriteTableInfo.create(hudiCatalogProvider, tableInfo.getTablePath()); this.ckpMetadata = ckpMetadataProvider.get(tableInfo.getTablePath(), hudiTableInfo); + this.compactionCompleteTimeoutMs = + getTimeoutMillis( + hudiTableInfo.getFlinkConfig(), + COMPACTION_COMPLETE_TIMEOUT_KEY, + DEFAULT_COMPACTION_COMPLETE_TIMEOUT); + this.compactionShutdownTimeoutMs = + getTimeoutMillis( + hudiTableInfo.getFlinkConfig(), + COMPACTION_SHUTDOWN_TIMEOUT_KEY, + DEFAULT_COMPACTION_SHUTDOWN_TIMEOUT); if (writerInitContext.splitIndex() == 0) { ckpMetadata.bootstrap(); @@ -86,17 +110,19 @@ public HudiLakeWriter( } this.recordWriter = new HudiRecordWriter(writerInitContext, hudiTableInfo, ckpMetadata); - if (shouldRunCompaction(writerInitContext)) { - this.compactionExecutor = + ExecutorService createdCompactionExecutor = null; + if (shouldRunCompaction()) { + createdCompactionExecutor = Executors.newSingleThreadExecutor( new ExecutorThreadFactory( "hudi-compact-" + writerInitContext.tableBucket())); - this.compactionFuture = executeCompactionAsync(hudiCatalogProvider, writerInitContext); - } else { - this.compactionExecutor = null; - this.compactionFuture = null; } + this.compactionExecutor = createdCompactionExecutor; LOG.info("Created HudiLakeWriter with configuration {}.", hudiTableInfo.getFlinkConfig()); + this.compactionFuture = + compactionExecutor == null + ? null + : executeCompactionAsync(hudiCatalogProvider, writerInitContext); } @Override @@ -114,9 +140,11 @@ public HudiWriteResult complete() throws IOException { Map> writeStatuses = recordWriter.complete(); Map> compactionWriteStatuses = Collections.emptyMap(); if (compactionFuture != null) { - compactionWriteStatuses = compactionFuture.get(); + compactionWriteStatuses = waitForCompaction(); } return HudiWriteResult.fromWriteStatuses(writeStatuses, compactionWriteStatuses); + } catch (IOException e) { + throw e; } catch (Exception e) { throw new IOException("Failed to complete Hudi write.", e); } @@ -134,18 +162,27 @@ public void close() throws IOException { @Nullable private IOException closeCompactionExecutor() { + if (compactionExecutor == null) { + return null; + } + try { - if (compactionFuture != null && !compactionFuture.isDone()) { - compactionFuture.cancel(true); - } - if (compactionExecutor != null) { - compactionExecutor.shutdown(); - if (!compactionExecutor.awaitTermination(30, TimeUnit.SECONDS)) { - LOG.warn("Failed to close Hudi compaction executor."); + cancelCompaction(); + compactionExecutor.shutdown(); + if (!compactionExecutor.awaitTermination( + compactionShutdownTimeoutMs, TimeUnit.MILLISECONDS)) { + compactionExecutor.shutdownNow(); + if (!compactionExecutor.awaitTermination( + compactionShutdownTimeoutMs, TimeUnit.MILLISECONDS)) { + return new IOException( + String.format( + "Failed to close Hudi compaction executor within %s ms.", + compactionShutdownTimeoutMs)); } } return null; } catch (InterruptedException e) { + compactionExecutor.shutdownNow(); Thread.currentThread().interrupt(); return new IOException("Interrupted while closing Hudi compaction executor.", e); } catch (Exception e) { @@ -153,6 +190,31 @@ private IOException closeCompactionExecutor() { } } + private Map> waitForCompaction() throws IOException { + try { + return compactionFuture.get(compactionCompleteTimeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + cancelCompaction(); + throw new IOException( + String.format( + "Timed out after %s ms waiting for Hudi compaction to finish.", + compactionCompleteTimeoutMs), + e); + } catch (InterruptedException e) { + cancelCompaction(); + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for Hudi compaction to finish.", e); + } catch (ExecutionException e) { + throw new IOException("Failed to execute Hudi compaction.", e.getCause()); + } + } + + private void cancelCompaction() { + if (compactionFuture != null && !compactionFuture.isDone()) { + compactionFuture.cancel(true); + } + } + @Nullable private static IOException close( @Nullable IOException failure, AutoCloseable closeable, String resourceName) { @@ -170,20 +232,22 @@ private static IOException close( } } - private boolean shouldRunCompaction(WriterInitContext writerInitContext) { - return writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction() - && hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ; + private boolean shouldRunCompaction() { + return hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ + && hudiTableInfo.getFlinkConfig().get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); } private CompletableFuture>> executeCompactionAsync( HudiCatalogProvider hudiCatalogProvider, WriterInitContext writerInitContext) { return CompletableFuture.supplyAsync( () -> { + // Compaction runs in a separate thread, so it intentionally uses an isolated + // Hudi write client instead of sharing the data writer's client state. try (HudiWriteTableInfo compactionTableInfo = HudiWriteTableInfo.create( hudiCatalogProvider, writerInitContext.tablePath())) { HudiCompactionService compactionService = - new HudiCompactionService( + HudiCompactionService.forExecutor( compactionTableInfo, writerInitContext.tableBucket(), writerInitContext.partition()); @@ -193,12 +257,12 @@ private CompletableFuture>> executeCompactionAsync compactionService.getCompactionPlans(instantTimes); return compactionService.executeCompaction(compactionPlans); } catch (Exception e) { - LOG.warn( - "Failed to execute Hudi compaction for table {}, bucket {}.", - writerInitContext.tablePath(), - writerInitContext.tableBucket(), + throw new CompletionException( + String.format( + "Failed to execute Hudi compaction for table %s, bucket %s.", + writerInitContext.tablePath(), + writerInitContext.tableBucket()), e); - return Collections.emptyMap(); } }, compactionExecutor); @@ -233,4 +297,19 @@ private static void validateWriterInitContext(WriterInitContext writerInitContex writerInitContext.tieringRoundTimestamp() != UNKNOWN_TIERING_ROUND_TIMESTAMP, "Hudi lake writer requires tiering round timestamp in WriterInitContext."); } + + private static long getTimeoutMillis(Configuration config, String key, Duration defaultTimeout) + throws IOException { + try { + Duration timeout = + TimeUtils.parseDuration( + config.getString(key, TimeUtils.getStringInMillis(defaultTimeout))); + if (timeout.toMillis() <= 0) { + throw new IllegalArgumentException("timeout must be greater than 0 ms"); + } + return timeout.toMillis(); + } catch (RuntimeException e) { + throw new IOException("Invalid Hudi compaction timeout option " + key + ".", e); + } + } } diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java index 72b90d87fc..21c8f83645 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java @@ -111,12 +111,12 @@ public static HudiTableInfo create(TablePath tablePath, Configuration hudiConfig HoodieTableFileSystemView fileSystemView = null; HoodieTableMetaClient metaClient = createMetaClient(basePath, hudiConfig); HoodieTimeline completedTimeline = - metaClient - .getCommitsAndCompactionTimeline() - .filterCompletedAndCompactionInstants(); + metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); HoodieEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf()); try { + // The source planner gates readable snapshots with completedTimeline. The file + // system view still needs pending compaction instants as MOR file-slice boundaries. HoodieTimeline fileSystemViewTimeline = metaClient .getCommitsAndCompactionTimeline() diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java index b33896f22a..f678c3766c 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java @@ -81,6 +81,7 @@ import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL; import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY; +import static org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; @@ -92,6 +93,10 @@ public abstract class FlinkHudiTieringTestBase { protected static final String DEFAULT_DB = "fluss"; protected static final String CATALOG_NAME = "testcatalog"; + protected static final String HUDI_CONF_PREFIX = "hudi."; + + private static final Duration HUDI_COMPACTION_COMMIT_TIMEOUT = Duration.ofSeconds(30); + private static final String COMMITTER_USER = "commit-user"; protected StreamExecutionEnvironment execEnv; @@ -185,11 +190,12 @@ protected long createPkTable( .distributedBy(bucketNum) .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) - .customProperty("hudi.precombine.field", "f_time"); + .customProperty(HUDI_CONF_PREFIX + "precombine.field", "f_time"); if (enableAutoCompaction) { tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true"); - tableBuilder.customProperty("hudi." + FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); + tableBuilder.customProperty( + HUDI_CONF_PREFIX + FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); } return createTable(tablePath, tableBuilder.build()); } @@ -203,12 +209,14 @@ protected long createLogTable( .distributedBy(bucketNum, "f_int") .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) - .customProperty("hudi.precombine.field", "f_str") - .customProperty("hudi." + FlinkOptions.RECORD_KEY_FIELD.key(), "f_int"); + .customProperty(HUDI_CONF_PREFIX + "precombine.field", "f_str") + .customProperty( + HUDI_CONF_PREFIX + FlinkOptions.RECORD_KEY_FIELD.key(), "f_int"); if (enableAutoCompaction) { tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true"); - tableBuilder.customProperty("hudi." + FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); + tableBuilder.customProperty( + HUDI_CONF_PREFIX + FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); } return createTable(tablePath, tableBuilder.build()); } @@ -339,31 +347,37 @@ protected void checkDataInHudiMORTable( expectedRecords.add(formatMORRow(row)); } - List actualRecords = - collectHudiRows( - tablePath, - partition, - bucket, - record -> - record.getBoolean(5) - + "," - + record.getInt(6) - + "," - + record.getLong(7) - + "," - + record.getFloat(8) - + "," - + record.getDouble(9) - + "," - + record.getString(10).toString() - + "," - + record.getDecimal(11, 5, 2).toBigDecimal().toPlainString() - + "," - + record.getDecimal(12, 20, 0) - .toBigDecimal() - .toPlainString()); - - assertThat(actualRecords).containsExactlyInAnyOrderElementsOf(expectedRecords); + retry( + Duration.ofMinutes(1), + () -> { + List actualRecords = + collectHudiRows( + tablePath, + partition, + bucket, + record -> + record.getBoolean(5) + + "," + + record.getInt(6) + + "," + + record.getLong(7) + + "," + + record.getFloat(8) + + "," + + record.getDouble(9) + + "," + + record.getString(10).toString() + + "," + + record.getDecimal(11, 5, 2) + .toBigDecimal() + .toPlainString() + + "," + + record.getDecimal(12, 20, 0) + .toBigDecimal() + .toPlainString()); + + assertThat(actualRecords).containsExactlyInAnyOrderElementsOf(expectedRecords); + }); } protected void checkDataInHudiMORPartitionTable( @@ -435,16 +449,10 @@ protected void checkFlussOffsetsInSnapshot( HudiTableInfo.create(tablePath, Configuration.fromMap(getHudiCatalogConf()))) { HoodieTableMetaClient metaClient = hudiTableInfo.getMetaClient(); metaClient.reloadActiveTimeline(); - HoodieTimeline timeline = - metaClient - .getActiveTimeline() - .getCommitsAndCompactionTimeline() - .filterCompletedInstants(); - Optional latestInstant = - timeline.getReverseOrderedInstantsByCompletionTime().findFirst(); - assertThat(latestInstant).isPresent(); + HoodieTimeline timeline = hudiTableInfo.getCompletedTimeline(); + HoodieInstant latestInstant = getLatestFlussDataInstant(hudiTableInfo); - HoodieCommitMetadata metadata = timeline.readCommitMetadata(latestInstant.get()); + HoodieCommitMetadata metadata = timeline.readCommitMetadata(latestInstant); Map extraMetadata = metadata.getExtraMetadata(); assertThat(extraMetadata).isNotNull(); String offsetFile = extraMetadata.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY); @@ -462,26 +470,8 @@ protected void checkFlussOffsetsInSnapshot( protected void checkHudiCompactionCommitted(TablePath tablePath) { retry( - Duration.ofMinutes(1), - () -> { - try (HudiTableInfo hudiTableInfo = - HudiTableInfo.create( - tablePath, Configuration.fromMap(getHudiCatalogConf()))) { - HoodieTimeline timeline = - hudiTableInfo - .getMetaClient() - .getActiveTimeline() - .getCommitsAndCompactionTimeline() - .filterCompletedInstants(); - assertThat( - timeline.getInstantsAsStream() - .anyMatch( - instant -> - isCompactionInstant( - timeline, instant))) - .isTrue(); - } - }); + HUDI_COMPACTION_COMMIT_TIMEOUT, + () -> assertThat(hasHudiCompactionCommitted(tablePath)).isTrue()); } private static String formatMORRow(InternalRow row) { @@ -551,16 +541,7 @@ private List collectHudiRows( private static List getLatestFileSlicesAtCompletedInstant( HudiTableInfo hudiTableInfo, String partition) { - HoodieTimeline completedTimeline = hudiTableInfo.getCompletedTimeline(); - HoodieInstant latestInstant = - completedTimeline - .lastInstant() - .orElseThrow( - () -> - new IllegalStateException( - "No completed Hudi instant found for table " - + hudiTableInfo.getTablePath() - + ".")); + HoodieInstant latestInstant = getLatestFlussDataInstant(hudiTableInfo); String latestInstantTime = latestInstant.requestedTime(); if (hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ) { return hudiTableInfo @@ -568,6 +549,8 @@ private static List getLatestFileSlicesAtCompletedInstant( .getLatestMergedFileSlicesBeforeOrOn(partition, latestInstantTime) .collect(Collectors.toList()); } + // includeFileSliceBefore=true lets COW reads include the base file slice visible at this + // completed instant. return hudiTableInfo .getFileSystemView() .getLatestFileSlicesBeforeOrOn(partition, latestInstantTime, true) @@ -586,12 +569,56 @@ private org.apache.flink.configuration.Configuration buildFlinkHudiOptions( return org.apache.flink.configuration.Configuration.fromMap(hudiOptions); } + private static HoodieInstant getLatestFlussDataInstant(HudiTableInfo hudiTableInfo) { + HoodieTimeline completedTimeline = hudiTableInfo.getCompletedTimeline(); + return completedTimeline + .getReverseOrderedInstants() + .filter(instant -> isFlussDataInstant(completedTimeline, instant)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "No Fluss data instant found for Hudi table " + + hudiTableInfo.getTablePath() + + ".")); + } + + private static boolean isFlussDataInstant(HoodieTimeline timeline, HoodieInstant instant) { + try { + HoodieCommitMetadata metadata = timeline.readCommitMetadata(instant); + Map extraMetadata = + metadata == null ? null : metadata.getExtraMetadata(); + return metadata != null + && metadata.getOperationType() != WriteOperationType.COMPACT + && extraMetadata != null + && FLUSS_LAKE_TIERING_COMMIT_USER.equals(extraMetadata.get(COMMITTER_USER)); + } catch (Exception e) { + throw new RuntimeException("Failed to read Hudi instant metadata " + instant + ".", e); + } + } + + private static boolean hasHudiCompactionCommitted(TablePath tablePath) { + try (HudiTableInfo hudiTableInfo = + HudiTableInfo.create(tablePath, Configuration.fromMap(getHudiCatalogConf()))) { + HoodieTimeline timeline = + hudiTableInfo + .getMetaClient() + .getActiveTimeline() + .getCommitsAndCompactionTimeline() + .filterCompletedInstants(); + return timeline.getInstantsAsStream() + .anyMatch(instant -> isCompactionInstant(timeline, instant)); + } catch (Exception e) { + return false; + } + } + private static boolean isCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) { try { HoodieCommitMetadata metadata = timeline.readCommitMetadata(instant); return metadata != null && metadata.getOperationType() == WriteOperationType.COMPACT; } catch (Exception e) { - throw new RuntimeException("Failed to read Hudi instant metadata " + instant + ".", e); + return false; } } diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java index 176d135d19..707fb6d393 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java @@ -97,7 +97,7 @@ protected static void beforeAll() { @Test void testTiering() throws Exception { TablePath pkTablePath = TablePath.of(DEFAULT_DB, "pkTable"); - long pkTableId = createPkTable(pkTablePath, 1, true, PK_SCHEMA); + long pkTableId = createPkTable(pkTablePath, 1, false, PK_SCHEMA); TableBucket pkTableBucket = new TableBucket(pkTableId, 0); List rows = Arrays.asList(pkRow(1, "v1"), pkRow(2, "v2"), pkRow(3, "v3")); @@ -121,16 +121,45 @@ void testTiering() throws Exception { checkDataInHudiMORTable(pkTablePath, "", rows, 0); checkFlussOffsetsInSnapshot(pkTablePath, Collections.singletonMap(pkTableBucket, 9L)); + testPartitionedTableTiering(); + } finally { + jobClient.cancel().get(); + } + } + + @Test + void testPkTableTieringWithAutoCompaction() throws Exception { + TablePath pkTablePath = TablePath.of(DEFAULT_DB, "pkTableWithAutoCompaction"); + long pkTableId = createPkTable(pkTablePath, 1, true, PK_SCHEMA); + TableBucket pkTableBucket = new TableBucket(pkTableId, 0); + + List rows = Arrays.asList(pkRow(1, "v1"), pkRow(2, "v2"), pkRow(3, "v3")); + writeRows(pkTablePath, rows, false); + waitUntilSnapshot(pkTableId, 1); + + execEnv.enableCheckpointing(1000); + JobClient jobClient = buildTieringJob(execEnv); + try { + assertReplicaStatus(pkTableBucket, 3); + checkDataInHudiMORTable(pkTablePath, "", rows, 0); + checkFlussOffsetsInSnapshot(pkTablePath, Collections.singletonMap(pkTableBucket, 3L)); + + rows = Arrays.asList(pkRow(1, "v111"), pkRow(2, "v222"), pkRow(3, "v333")); + writeRows(pkTablePath, rows, false); + + // 3 initial records + 3 delete records + 3 insert records. + assertReplicaStatus(pkTableBucket, 9); + checkDataInHudiMORTable(pkTablePath, "", rows, 0); + checkFlussOffsetsInSnapshot(pkTablePath, Collections.singletonMap(pkTableBucket, 9L)); + rows = Arrays.asList(pkRow(1, "v1111"), pkRow(2, "v2222"), pkRow(3, "v3333")); writeRows(pkTablePath, rows, false); - // 3 current records + 3 delete records + 3 insert records. + // 9 previous records + 3 delete records + 3 insert records. assertReplicaStatus(pkTableBucket, 15); checkDataInHudiMORTable(pkTablePath, "", rows, 0); checkFlussOffsetsInSnapshot(pkTablePath, Collections.singletonMap(pkTableBucket, 15L)); checkHudiCompactionCommitted(pkTablePath); - - testPartitionedTableTiering(); } finally { jobClient.cancel().get(); } @@ -206,8 +235,9 @@ private Tuple2 createPartitionedTable(TablePath partition AutoPartitionTimeUnit.YEAR) .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) - .customProperty("hudi.precombine.field", "date") - .customProperty("hudi.hoodie.datasource.write.recordkey.field", "id") + .customProperty(HUDI_CONF_PREFIX + "precombine.field", "date") + .customProperty( + HUDI_CONF_PREFIX + "hoodie.datasource.write.recordkey.field", "id") .build(); return Tuple2.of( createTable(partitionedTablePath, partitionedTableDescriptor), diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java index 58e6b8f215..949aba6be0 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java @@ -20,8 +20,10 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.lake.committer.CommittedLakeSnapshot; import org.apache.fluss.lake.committer.CommitterInitContext; +import org.apache.fluss.lake.committer.LakeCommitResult; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.hudi.HudiLakeCatalog; +import org.apache.fluss.lake.hudi.utils.meta.CkpMetadata; import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; @@ -37,27 +39,50 @@ import org.apache.fluss.row.GenericRow; import org.apache.fluss.types.DataTypes; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.mockito.InOrder; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY; import static org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyList; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** Test for tiering Fluss records to Hudi via {@link HudiLakeTieringFactory}. */ class HudiTieringTest { + private static final String HUDI_CONF_PREFIX = "hudi."; + private static final String DATA_INSTANT = "20260624000200000"; + private static final String COMPACTION_INSTANT = "20260624000300000"; + @TempDir private File tempWarehouseDir; private Configuration hudiConfig; @@ -154,17 +179,123 @@ void testRejectMissingWriteStatsOnCommit() throws Exception { } @Test - void testLatestCommittedInstantNeverGoesBackwards() { - assertThat(HudiLakeCommitter.getLatestCommittedInstant("20260624000200000", null)) - .isEqualTo("20260624000200000"); - assertThat( - HudiLakeCommitter.getLatestCommittedInstant( - "20260624000200000", "20260624000100000")) - .isEqualTo("20260624000200000"); - assertThat( - HudiLakeCommitter.getLatestCommittedInstant( - "20260624000200000", "20260624000300000")) - .isEqualTo("20260624000300000"); + void testCommitReturnsDataInstantAndSchedulesCompaction() throws Exception { + TestingCommitterContext context = createTestingCommitterContext(); + HudiCommittable committable = committableWithCompaction(); + whenDataCommitSucceeds(context); + when(context.compactionService.commitCompaction( + eq(committable.getCompactionWriteStats()), anyMap())) + .thenReturn(COMPACTION_INSTANT); + + LakeCommitResult commitResult = + context.committer.commit(committable, Collections.emptyMap()); + + assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(Long.parseLong(DATA_INSTANT)); + InOrder inOrder = + inOrder(context.writeClient, context.ckpMetadata, context.compactionService); + inOrder.verify(context.writeClient) + .commitStats(eq(DATA_INSTANT), anyList(), any(Option.class), eq("commit")); + inOrder.verify(context.ckpMetadata).commitInstant(DATA_INSTANT); + inOrder.verify(context.compactionService) + .commitCompaction(eq(committable.getCompactionWriteStats()), anyMap()); + inOrder.verify(context.compactionService).scheduleCompaction(); + inOrder.verify(context.compactionService).markSelectedCompactionsInflight(); + } + + @Test + void testCommitFailsWhenCompactionCommitFails() throws Exception { + TestingCommitterContext context = createTestingCommitterContext(); + HudiCommittable committable = committableWithCompaction(); + whenDataCommitSucceeds(context); + doThrow(new IOException("compaction failed")) + .when(context.compactionService) + .commitCompaction(eq(committable.getCompactionWriteStats()), anyMap()); + + assertThatThrownBy(() -> context.committer.commit(committable, Collections.emptyMap())) + .isInstanceOf(IOException.class) + .hasMessageContaining("compaction failed"); + verify(context.compactionService, never()).scheduleCompaction(); + verify(context.compactionService, never()).markSelectedCompactionsInflight(); + } + + @Test + void testScheduleCompactionFailureDoesNotFailCommit() throws Exception { + TestingCommitterContext context = createTestingCommitterContext(); + HudiCommittable committable = committableWithCompaction(); + whenDataCommitSucceeds(context); + when(context.compactionService.commitCompaction( + eq(committable.getCompactionWriteStats()), anyMap())) + .thenReturn(COMPACTION_INSTANT); + doThrow(new IOException("schedule failed")) + .when(context.compactionService) + .scheduleCompaction(); + + LakeCommitResult commitResult = + context.committer.commit(committable, Collections.emptyMap()); + + assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(Long.parseLong(DATA_INSTANT)); + verify(context.compactionService, never()).markSelectedCompactionsInflight(); + } + + @Test + void testAbortRollsBackCompactionBeforeDataInstant() throws Exception { + TestingCommitterContext context = createTestingCommitterContext(); + when(context.writeClient.getHoodieTable()) + .thenThrow(new RuntimeException("rollback failed")); + when(context.writeClient.rollback(DATA_INSTANT)).thenReturn(true); + + assertThatThrownBy(() -> context.committer.abort(committableWithCompaction())) + .isInstanceOf(IOException.class) + .hasMessageContaining(COMPACTION_INSTANT); + InOrder inOrder = inOrder(context.metaClient, context.writeClient, context.ckpMetadata); + inOrder.verify(context.metaClient).reloadActiveTimeline(); + inOrder.verify(context.writeClient).getHoodieTable(); + inOrder.verify(context.writeClient).rollback(DATA_INSTANT); + inOrder.verify(context.ckpMetadata).abortInstant(DATA_INSTANT); + } + + @Test + void testFlinkCompactionConfigMapping() { + org.apache.flink.configuration.Configuration config = + new org.apache.flink.configuration.Configuration(); + config.set(FlinkOptions.PATH, "/tmp/hudi/table"); + config.set(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, FlinkCompactionConfig.NUM_OR_TIME); + config.set(FlinkOptions.ARCHIVE_MAX_COMMITS, 30); + config.set(FlinkOptions.ARCHIVE_MIN_COMMITS, 20); + config.set(FlinkOptions.CLEAN_POLICY, "KEEP_LATEST_COMMITS"); + config.set(FlinkOptions.CLEAN_RETAIN_COMMITS, 10); + config.set(FlinkOptions.CLEAN_RETAIN_HOURS, 24); + config.set(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, 5); + config.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 2); + config.set(FlinkOptions.COMPACTION_DELTA_SECONDS, 120); + config.set(FlinkOptions.COMPACTION_MAX_MEMORY, 256); + config.set(FlinkOptions.COMPACTION_TARGET_IO, 1024L); + config.set(FlinkOptions.COMPACTION_TASKS, 3); + config.set(FlinkOptions.CLEAN_ASYNC_ENABLED, true); + config.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true); + + FlinkCompactionConfig compactionConfig = + HudiCompactionService.toFlinkCompactionConfig(config); + + assertThat(compactionConfig.path).isEqualTo("/tmp/hudi/table"); + assertThat(compactionConfig.compactionTriggerStrategy) + .isEqualTo(FlinkCompactionConfig.NUM_OR_TIME); + assertThat(compactionConfig.archiveMaxCommits).isEqualTo(30); + assertThat(compactionConfig.archiveMinCommits).isEqualTo(20); + assertThat(compactionConfig.cleanPolicy).isEqualTo("KEEP_LATEST_COMMITS"); + assertThat(compactionConfig.cleanRetainCommits).isEqualTo(10); + assertThat(compactionConfig.cleanRetainHours).isEqualTo(24); + assertThat(compactionConfig.cleanRetainFileVersions).isEqualTo(5); + assertThat(compactionConfig.compactionDeltaCommits).isEqualTo(2); + assertThat(compactionConfig.compactionDeltaSeconds).isEqualTo(120); + assertThat(compactionConfig.compactionMaxMemory).isEqualTo(256); + assertThat(compactionConfig.compactionTargetIo).isEqualTo(1024L); + assertThat(compactionConfig.compactionTasks).isEqualTo(3); + assertThat(compactionConfig.cleanAsyncEnable).isTrue(); + assertThat(compactionConfig.schedule).isTrue(); + assertThat(compactionConfig.compactionPlanSelectStrategy) + .isEqualTo(CompactionPlanStrategy.NUM_INSTANTS); + assertThat(compactionConfig.maxNumCompactionPlans).isEqualTo(1); } private LakeCommitter createLakeCommitter( @@ -173,6 +304,46 @@ private LakeCommitter createLakeCommitter( new TestingCommitterInitContext(tablePath, tableInfo)); } + private static TestingCommitterContext createTestingCommitterContext() { + HudiWriteTableInfo hudiTableInfo = mock(HudiWriteTableInfo.class); + HoodieFlinkWriteClient writeClient = mock(HoodieFlinkWriteClient.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + CkpMetadata ckpMetadata = mock(CkpMetadata.class); + HudiCompactionService compactionService = mock(HudiCompactionService.class); + org.apache.flink.configuration.Configuration flinkConfig = + new org.apache.flink.configuration.Configuration(); + flinkConfig.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true); + + when(hudiTableInfo.getWriteClient()).thenReturn(writeClient); + when(hudiTableInfo.getMetaClient()).thenReturn(metaClient); + when(hudiTableInfo.getFlinkConfig()).thenReturn(flinkConfig); + when(hudiTableInfo.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ); + when(hudiTableInfo.getTablePath()).thenReturn(TablePath.of("hudi", "mock_table")); + when(metaClient.getCommitActionType()).thenReturn("commit"); + return new TestingCommitterContext( + new HudiLakeCommitter(hudiTableInfo, ckpMetadata, compactionService), + writeClient, + metaClient, + ckpMetadata, + compactionService); + } + + private static void whenDataCommitSucceeds(TestingCommitterContext context) { + when(context.writeClient.commitStats( + eq(DATA_INSTANT), anyList(), any(Option.class), eq("commit"))) + .thenReturn(true); + } + + private static HudiCommittable committableWithCompaction() { + Map writeStats = new HashMap<>(); + writeStats.put( + DATA_INSTANT, new HudiWriteStats(Collections.singletonList(writeStat()), 0L)); + Map compactionWriteStats = new HashMap<>(); + compactionWriteStats.put( + COMPACTION_INSTANT, new HudiWriteStats(Collections.singletonList(writeStat()), 0L)); + return new HudiCommittable(writeStats, compactionWriteStats); + } + private static TableDescriptor createLogTableDescriptor() { return TableDescriptor.builder() .schema( @@ -181,8 +352,8 @@ private static TableDescriptor createLogTableDescriptor() { .column("name", DataTypes.STRING()) .build()) .distributedBy(1, "id") - .customProperty("hudi." + FlinkOptions.RECORD_KEY_FIELD.key(), "id") - .customProperty("hudi.precombine.field", "name") + .customProperty(HUDI_CONF_PREFIX + FlinkOptions.RECORD_KEY_FIELD.key(), "id") + .customProperty(HUDI_CONF_PREFIX + "precombine.field", "name") .build(); } @@ -201,6 +372,28 @@ private static HoodieWriteStat writeStat() { return writeStat; } + private static class TestingCommitterContext { + + private final HudiLakeCommitter committer; + private final HoodieFlinkWriteClient writeClient; + private final HoodieTableMetaClient metaClient; + private final CkpMetadata ckpMetadata; + private final HudiCompactionService compactionService; + + private TestingCommitterContext( + HudiLakeCommitter committer, + HoodieFlinkWriteClient writeClient, + HoodieTableMetaClient metaClient, + CkpMetadata ckpMetadata, + HudiCompactionService compactionService) { + this.committer = committer; + this.writeClient = writeClient; + this.metaClient = metaClient; + this.ckpMetadata = ckpMetadata; + this.compactionService = compactionService; + } + } + private static class TestingWriterInitContext implements WriterInitContext { private final TablePath tablePath;