diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java index b57c984b06..7bc10e2dfd 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java @@ -119,6 +119,11 @@ private List planPartition( .getLatestMergedFileSlicesBeforeOrOn(partitionPath, snapshotTime) .collect(Collectors.toList()); for (FileSlice fileSlice : fileSlices) { + // Pending MOR compaction plans can expose empty file slices; they are not readable + // at the requested completed instant. + if (fileSlice.isEmpty()) { + continue; + } splits.add(toHudiSplit(hudiTableInfo, partitionPath, fileSlice)); } return splits; diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java new file mode 100644 index 0000000000..f03ad27175 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiCompactionService.java @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi.tiering; + +import org.apache.fluss.metadata.TableBucket; + +import org.apache.flink.configuration.Configuration; +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.InstantGenerator; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.action.compact.CompactHelpers; +import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; +import org.apache.hudi.table.format.FlinkRowDataReaderContext; +import org.apache.hudi.table.format.InternalSchemaManager; +import org.apache.hudi.util.CompactionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** Coordinates Hudi compaction scheduling, execution, and commit for tiering writers. */ +public class HudiCompactionService { + + private static final Logger LOG = LoggerFactory.getLogger(HudiCompactionService.class); + + private final HudiWriteTableInfo hudiTableInfo; + private final HoodieFlinkWriteClient writeClient; + private final HoodieTableMetaClient metaClient; + private final HoodieFlinkTable table; + private final Configuration conf; + @Nullable private final TableBucket tableBucket; + @Nullable private final String partition; + + private InternalSchemaManager internalSchemaManager; + + public static HudiCompactionService forScheduler(HudiWriteTableInfo hudiTableInfo) { + return new HudiCompactionService(hudiTableInfo, null, null); + } + + public static HudiCompactionService forExecutor( + HudiWriteTableInfo hudiTableInfo, TableBucket tableBucket, @Nullable String partition) { + return new HudiCompactionService(hudiTableInfo, tableBucket, partition); + } + + private HudiCompactionService( + HudiWriteTableInfo hudiTableInfo, + @Nullable TableBucket tableBucket, + @Nullable String partition) { + this.hudiTableInfo = checkNotNull(hudiTableInfo, "Hudi write table info must not be null."); + this.writeClient = hudiTableInfo.getWriteClient(); + this.metaClient = hudiTableInfo.getMetaClient(); + this.table = writeClient.getHoodieTable(); + this.conf = hudiTableInfo.getFlinkConfig(); + this.tableBucket = tableBucket; + this.partition = partition; + } + + public boolean scheduleCompaction() throws IOException { + LOG.info("Scheduling Hudi compaction for table {}.", hudiTableInfo.getTablePath()); + metaClient.reloadActiveTimeline(); + try { + boolean scheduled = writeClient.scheduleCompaction(Option.empty()).isPresent(); + metaClient.reloadActiveTimeline(); + if (!scheduled) { + LOG.info( + "No Hudi compaction plan was scheduled for table {}.", + hudiTableInfo.getTablePath()); + } + return scheduled; + } catch (Exception e) { + throw new IOException( + "Failed to schedule Hudi compaction for table " + + hudiTableInfo.getTablePath() + + ".", + e); + } + } + + public void markSelectedCompactionsInflight() { + List compactionInstantTimes = getSelectedCompactionInstantTimes(); + if (compactionInstantTimes.isEmpty()) { + return; + } + compactionInstantTimes = validateSelectedCompactionInstantTimes(compactionInstantTimes); + if (compactionInstantTimes.isEmpty()) { + return; + } + + HoodieTimeline pendingCompactionTimeline = + table.getActiveTimeline().filterPendingCompactionTimeline(); + InstantGenerator instantGenerator = table.getInstantGenerator(); + + for (String timestamp : compactionInstantTimes) { + HoodieInstant inflightInstant = + instantGenerator.getCompactionInflightInstant(timestamp); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + LOG.info("Rollback stale inflight Hudi compaction instant {}.", timestamp); + table.rollbackInflightCompaction( + inflightInstant, writeClient.getTransactionManager()); + metaClient.reloadActiveTimeline(); + } + } + + pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + for (String timestamp : compactionInstantTimes) { + HoodieInstant requestedInstant = + instantGenerator.getCompactionRequestedInstant(timestamp); + if (pendingCompactionTimeline.containsInstant(requestedInstant)) { + // Each Fluss tiering table has a single lake committer, so requested->inflight + // transitions are serialized by the committer task for this table. + table.getActiveTimeline().transitionCompactionRequestedToInflight(requestedInstant); + LOG.info("Marked Hudi compaction instant {} as inflight.", timestamp); + } + } + metaClient.reloadActiveTimeline(); + } + + public List getInflightCompactionInstantTimes() { + List compactionInstantTimes = getSelectedCompactionInstantTimes(); + if (compactionInstantTimes.isEmpty()) { + return Collections.emptyList(); + } + + HoodieTimeline pendingCompactionTimeline = + metaClient.getActiveTimeline().filterPendingCompactionTimeline(); + InstantGenerator instantGenerator = table.getInstantGenerator(); + + List inflightCompactionInstantTimes = new ArrayList<>(); + for (String timestamp : compactionInstantTimes) { + HoodieInstant inflightInstant = + instantGenerator.getCompactionInflightInstant(timestamp); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + inflightCompactionInstantTimes.add(inflightInstant.requestedTime()); + } + } + LOG.info( + "Found {} inflight Hudi compaction instants for table {}.", + inflightCompactionInstantTimes.size(), + hudiTableInfo.getTablePath()); + return inflightCompactionInstantTimes; + } + + public List> getCompactionPlans( + List compactionInstantTimes) { + if (compactionInstantTimes == null || compactionInstantTimes.isEmpty()) { + return Collections.emptyList(); + } + + List> compactionPlans = + compactionInstantTimes.stream() + .map( + timestamp -> { + try { + return Pair.of( + timestamp, + CompactionUtils.getCompactionPlan( + metaClient, timestamp)); + } catch (Exception e) { + throw new HoodieException( + "Failed to get Hudi compaction plan " + timestamp, + e); + } + }) + .filter(pair -> isValidCompactionPlan(pair.getRight())) + .collect(Collectors.toList()); + + LOG.info( + "Loaded {} Hudi compaction plans for table {}.", + compactionPlans.size(), + hudiTableInfo.getTablePath()); + return compactionPlans; + } + + public Map> executeCompaction( + List> compactionPlans) throws IOException { + if (compactionPlans == null || compactionPlans.isEmpty()) { + return Collections.emptyMap(); + } + TableBucket currentBucket = + checkNotNull(tableBucket, "Hudi compaction execution requires a table bucket."); + + Map> writeStatusesByInstant = new HashMap<>(); + HoodieWriteConfig writeConfig = writeClient.getConfig(); + String hudiPartitionPath = toHudiPartitionPath(partition); + + for (Pair planPair : compactionPlans) { + String instantTime = planPair.getLeft(); + HoodieCompactionPlan compactionPlan = planPair.getRight(); + metaClient.reload(); + try { + CompactionUtil.setAvroSchema(writeConfig, metaClient); + internalSchemaManager = null; + HoodieReaderContext readerContext = createReaderContext(); + for (HoodieCompactionOperation operation : compactionPlan.getOperations()) { + if (!belongsToCurrentWriter(operation, hudiPartitionPath, currentBucket)) { + continue; + } + + HoodieFlinkMergeOnReadTableCompactor compactor = + new HoodieFlinkMergeOnReadTableCompactor<>(); + CompactionOperation compactionOperation = + CompactionOperation.convertFromAvroRecordInstance(operation); + + List writeStatuses = + compactor.compact( + writeConfig, + compactionOperation, + instantTime, + table.getTaskContextSupplier(), + readerContext, + table); + writeStatusesByInstant + .computeIfAbsent(instantTime, ignored -> new ArrayList<>()) + .addAll(writeStatuses); + LOG.info( + "Compacted Hudi file id {} for table {}, partition {}, bucket {}, instant {}.", + operation.getFileId(), + hudiTableInfo.getTablePath(), + hudiPartitionPath, + currentBucket.getBucket(), + instantTime); + } + } catch (Exception e) { + writeStatusesByInstant.remove(instantTime); + throw rollbackCompactionAfterFailure( + instantTime, hudiPartitionPath, currentBucket, e); + } + } + return writeStatusesByInstant; + } + + public String commitCompaction( + Map compactionWriteStats, + Map snapshotProperties) + throws IOException { + if (compactionWriteStats == null || compactionWriteStats.isEmpty()) { + return null; + } + + List compactionInstants = new ArrayList<>(compactionWriteStats.keySet()); + Collections.sort(compactionInstants); + String latestInstant = compactionInstants.get(compactionInstants.size() - 1); + for (String compactionInstant : compactionInstants) { + HudiWriteStats writeStats = compactionWriteStats.get(compactionInstant); + + if (writeStats.getTotalErrorRecords() > 0 && !conf.get(FlinkOptions.IGNORE_FAILED)) { + LOG.warn( + "Rollback Hudi compaction instant {} because it contains {} error records.", + compactionInstant, + writeStats.getTotalErrorRecords()); + CompactionUtil.rollbackCompaction( + table, compactionInstant, writeClient.getTransactionManager()); + throw new IOException( + "Failed to commit Hudi compaction instant " + + compactionInstant + + " because it contains " + + writeStats.getTotalErrorRecords() + + " error records."); + } + + doCommitCompaction(compactionInstant, writeStats, snapshotProperties); + LOG.info("Committed Hudi compaction instant {}.", compactionInstant); + } + return latestInstant; + } + + private List getSelectedCompactionInstantTimes() { + metaClient.reloadActiveTimeline(); + HoodieTimeline pendingCompactionTimeline = + table.getActiveTimeline().filterPendingCompactionTimeline(); + FlinkCompactionConfig compactionConfig = toFlinkCompactionConfig(conf); + List requestedInstants = + CompactionPlanStrategies.getStrategy(compactionConfig) + .select(pendingCompactionTimeline); + if (requestedInstants.isEmpty()) { + LOG.info( + "No pending Hudi compaction plan found for table {}.", + hudiTableInfo.getTablePath()); + return Collections.emptyList(); + } + return requestedInstants.stream() + .map(HoodieInstant::requestedTime) + .collect(Collectors.toList()); + } + + private List validateSelectedCompactionInstantTimes( + List compactionInstantTimes) { + HoodieTimeline pendingCompactionTimeline = + table.getActiveTimeline().filterPendingCompactionTimeline(); + InstantGenerator instantGenerator = table.getInstantGenerator(); + List validCompactionInstantTimes = new ArrayList<>(); + for (String timestamp : compactionInstantTimes) { + HoodieCompactionPlan compactionPlan = loadCompactionPlan(timestamp); + if (isValidCompactionPlan(compactionPlan)) { + validCompactionInstantTimes.add(timestamp); + continue; + } + + HoodieInstant inflightInstant = + instantGenerator.getCompactionInflightInstant(timestamp); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + LOG.warn( + "Rollback invalid inflight Hudi compaction instant {} for table {}.", + timestamp, + hudiTableInfo.getTablePath()); + table.rollbackInflightCompaction( + inflightInstant, writeClient.getTransactionManager()); + metaClient.reloadActiveTimeline(); + } else { + LOG.warn( + "Skip invalid Hudi compaction plan {} for table {}.", + timestamp, + hudiTableInfo.getTablePath()); + } + } + return validCompactionInstantTimes; + } + + private HoodieCompactionPlan loadCompactionPlan(String timestamp) { + try { + return CompactionUtils.getCompactionPlan(metaClient, timestamp); + } catch (Exception e) { + throw new HoodieException("Failed to get Hudi compaction plan " + timestamp, e); + } + } + + private HoodieReaderContext createReaderContext() { + Supplier internalSchemaManagerSupplier = + () -> { + if (internalSchemaManager == null) { + internalSchemaManager = + InternalSchemaManager.get(metaClient.getStorageConf(), metaClient); + } + return internalSchemaManager; + }; + StorageConfiguration readerConf = writeClient.getEngineContext().getStorageConf(); + return new FlinkRowDataReaderContext( + readerConf, + internalSchemaManagerSupplier, + Collections.emptyList(), + metaClient.getTableConfig(), + Option.empty()); + } + + private IOException rollbackCompactionAfterFailure( + String instantTime, + String hudiPartitionPath, + TableBucket currentBucket, + Exception cause) { + IOException failure = + new IOException( + String.format( + "Failed to execute Hudi compaction for table %s, partition %s, bucket %s, instant %s.", + hudiTableInfo.getTablePath(), + hudiPartitionPath, + currentBucket.getBucket(), + instantTime), + cause); + try { + CompactionUtil.rollbackCompaction( + table, instantTime, writeClient.getTransactionManager()); + metaClient.reloadActiveTimeline(); + } catch (Exception rollbackFailure) { + failure.addSuppressed( + new IOException( + "Failed to rollback Hudi compaction instant " + instantTime + ".", + rollbackFailure)); + } + return failure; + } + + private void doCommitCompaction( + String instant, HudiWriteStats writeStats, Map snapshotProperties) + throws IOException { + HoodieCommitMetadata metadata = + CompactHelpers.getInstance() + .createCompactionMetadata( + table, + instant, + HoodieListData.eager(toWriteStatuses(writeStats)), + writeClient.getConfig().getSchema()); + snapshotProperties.forEach(metadata::addMetadata); + + writeClient.completeCompaction(metadata, table, instant); + if (!conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED)) { + writeClient.clean(); + } + metaClient.reloadActiveTimeline(); + } + + private static boolean belongsToCurrentWriter( + HoodieCompactionOperation operation, + String hudiPartitionPath, + TableBucket currentBucket) { + return belongsToCurrentPartition(operation, hudiPartitionPath) + && belongsToCurrentBucket(operation, currentBucket); + } + + private static boolean belongsToCurrentPartition( + HoodieCompactionOperation operation, String hudiPartitionPath) { + String operationPartitionPath = normalizePartitionPath(operation.getPartitionPath()); + return Objects.equals(operationPartitionPath, hudiPartitionPath); + } + + private static boolean belongsToCurrentBucket( + HoodieCompactionOperation operation, TableBucket currentBucket) { + String fileId = operation.getFileId(); + if (fileId == null || fileId.isEmpty()) { + return false; + } + try { + return BucketIdentifier.bucketIdFromFileId(fileId) == currentBucket.getBucket(); + } catch (RuntimeException e) { + LOG.warn("Failed to parse Hudi bucket id from file id {}.", fileId, e); + return false; + } + } + + private String toHudiPartitionPath(@Nullable String partitionName) throws IOException { + if (partitionName == null || partitionName.isEmpty()) { + return ""; + } + + String partitionPath = partitionName.replace('$', '/'); + if (!conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING)) { + return partitionPath; + } + + String partitionFields = conf.get(FlinkOptions.PARTITION_PATH_FIELD); + if (partitionFields == null || partitionFields.trim().isEmpty()) { + return partitionPath; + } + + String[] fields = partitionFields.split(","); + String[] values = partitionName.split("\\$"); + if (fields.length != values.length) { + throw new IOException( + String.format( + "Invalid Fluss partition name '%s' for Hudi hive-style partition fields '%s'. Expected %s values separated by '$' but got %s.", + partitionName, partitionFields, fields.length, values.length)); + } + + List hiveStylePartitionSegments = new ArrayList<>(fields.length); + for (int i = 0; i < fields.length; i++) { + hiveStylePartitionSegments.add(fields[i].trim() + "=" + values[i]); + } + return String.join("/", hiveStylePartitionSegments); + } + + private static String normalizePartitionPath(@Nullable String partitionPath) { + return partitionPath == null ? "" : partitionPath; + } + + private static List toWriteStatuses(HudiWriteStats writeStats) { + List stats = writeStats.getWriteStats(); + if (stats.isEmpty()) { + return Collections.emptyList(); + } + + List writeStatuses = new ArrayList<>(stats.size()); + for (HoodieWriteStat stat : stats) { + WriteStatus writeStatus = new WriteStatus(); + writeStatus.setStat(stat); + writeStatus.setFileId(stat.getFileId()); + writeStatus.setPartitionPath(stat.getPartitionPath()); + writeStatuses.add(writeStatus); + } + return writeStatuses; + } + + private static boolean isValidCompactionPlan(HoodieCompactionPlan plan) { + return plan != null && plan.getOperations() != null && !plan.getOperations().isEmpty(); + } + + static FlinkCompactionConfig toFlinkCompactionConfig(Configuration config) { + FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + + // Hudi does not expose a reverse helper for FlinkCompactionConfig. Keep this mapping + // focused on the scheduling and cleaning fields used by Fluss tiering; CLI/service-mode + // fields intentionally keep Hudi defaults. + compactionConfig.path = config.get(FlinkOptions.PATH); + compactionConfig.compactionTriggerStrategy = + config.get(FlinkOptions.COMPACTION_TRIGGER_STRATEGY); + compactionConfig.archiveMaxCommits = config.get(FlinkOptions.ARCHIVE_MAX_COMMITS); + compactionConfig.archiveMinCommits = config.get(FlinkOptions.ARCHIVE_MIN_COMMITS); + compactionConfig.cleanPolicy = config.get(FlinkOptions.CLEAN_POLICY); + compactionConfig.cleanRetainCommits = config.get(FlinkOptions.CLEAN_RETAIN_COMMITS); + compactionConfig.cleanRetainHours = config.get(FlinkOptions.CLEAN_RETAIN_HOURS); + compactionConfig.cleanRetainFileVersions = + config.get(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS); + compactionConfig.compactionDeltaCommits = config.get(FlinkOptions.COMPACTION_DELTA_COMMITS); + compactionConfig.compactionDeltaSeconds = config.get(FlinkOptions.COMPACTION_DELTA_SECONDS); + compactionConfig.compactionMaxMemory = config.get(FlinkOptions.COMPACTION_MAX_MEMORY); + compactionConfig.compactionTargetIo = config.get(FlinkOptions.COMPACTION_TARGET_IO); + compactionConfig.compactionTasks = config.get(FlinkOptions.COMPACTION_TASKS); + compactionConfig.cleanAsyncEnable = config.get(FlinkOptions.CLEAN_ASYNC_ENABLED); + compactionConfig.schedule = config.get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); + return compactionConfig; + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java index 44d898e29f..47313e4072 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeCommitter.java @@ -27,11 +27,14 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.util.CompactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,12 +42,11 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import static org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; @@ -58,6 +60,7 @@ public class HudiLakeCommitter implements LakeCommitter writeClient; private final CkpMetadata ckpMetadata; + private final HudiCompactionService compactionService; public HudiLakeCommitter( HudiCatalogProvider hudiCatalogProvider, @@ -67,10 +70,21 @@ public HudiLakeCommitter( this.hudiTableInfo = HudiWriteTableInfo.create(hudiCatalogProvider, tablePath); this.writeClient = hudiTableInfo.getWriteClient(); this.ckpMetadata = ckpMetadataProvider.get(tablePath, hudiTableInfo); + this.compactionService = HudiCompactionService.forScheduler(hudiTableInfo); LOG.info( "Created HudiLakeCommitter with configuration {}.", hudiTableInfo.getFlinkConfig()); } + HudiLakeCommitter( + HudiWriteTableInfo hudiTableInfo, + CkpMetadata ckpMetadata, + HudiCompactionService compactionService) { + this.hudiTableInfo = hudiTableInfo; + this.writeClient = hudiTableInfo.getWriteClient(); + this.ckpMetadata = ckpMetadata; + this.compactionService = compactionService; + } + @Override public HudiCommittable toCommittable(List hudiWriteResults) { HudiCommittable.Builder committableBuilder = HudiCommittable.builder(); @@ -85,8 +99,6 @@ public HudiCommittable toCommittable(List hudiWriteResults) { public LakeCommitResult commit( HudiCommittable committable, Map snapshotProperties) throws IOException { - ensureNoCompactionWriteStats(committable); - Map writeStatsByInstant = committable.getWriteStats(); if (writeStatsByInstant.size() != 1) { throw new IOException( @@ -129,6 +141,7 @@ public LakeCommitResult commit( ckpMetadata.commitInstant(instant); LOG.info("Committed Hudi instant {} successfully.", instant); + commitCompactionAndSchedule(committable.getCompactionWriteStats()); return LakeCommitResult.committedIsReadable(parseSnapshotId(instant)); } catch (Exception e) { if (e instanceof IOException) { @@ -140,10 +153,21 @@ public LakeCommitResult commit( @Override public void abort(HudiCommittable committable) throws IOException { - Set instants = new LinkedHashSet<>(committable.getWriteStats().keySet()); - instants.addAll(committable.getCompactionWriteStats().keySet()); IOException failure = null; - for (String instant : instants) { + for (String instant : committable.getCompactionWriteStats().keySet()) { + try { + abortCompactionInstant(instant); + LOG.info("Aborted Hudi compaction instant {}.", instant); + } catch (IOException e) { + failure = + addSuppressed( + failure, + new IOException( + "Failed to abort Hudi compaction instant " + instant + ".", + e)); + } + } + for (String instant : committable.getWriteStats().keySet()) { try { abortInstant(instant); LOG.info("Aborted Hudi instant {}.", instant); @@ -160,17 +184,6 @@ public void abort(HudiCommittable committable) throws IOException { } } - private static void ensureNoCompactionWriteStats(HudiCommittable committable) - throws IOException { - Map compactionWriteStats = committable.getCompactionWriteStats(); - if (!compactionWriteStats.isEmpty()) { - throw new IOException( - "Hudi compaction write stats are not supported yet, but got instants " - + compactionWriteStats.keySet() - + "."); - } - } - @Nullable @Override public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSnapshotIdOfFluss) @@ -207,6 +220,34 @@ public void close() throws Exception { IOUtils.closeQuietly(hudiTableInfo, "hudi table info"); } + private void commitCompactionAndSchedule(Map compactionWriteStats) + throws IOException { + if (!isAutoCompactionEnabled()) { + return; + } + + String latestCommittedInstant = + compactionService.commitCompaction(compactionWriteStats, Collections.emptyMap()); + if (latestCommittedInstant != null) { + LOG.info("Committed Hudi compaction instant {}.", latestCommittedInstant); + } + + try { + compactionService.scheduleCompaction(); + compactionService.markSelectedCompactionsInflight(); + } catch (Exception e) { + LOG.warn( + "Failed to schedule Hudi compaction for table {}.", + hudiTableInfo.getTablePath(), + e); + } + } + + private boolean isAutoCompactionEnabled() { + return hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ + && hudiTableInfo.getFlinkConfig().get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); + } + private void validateWriteStats(String instant, HudiWriteStats writeStats) { long totalErrorRecords = writeStats.getTotalErrorRecords(); if (totalErrorRecords > 0 @@ -250,6 +291,16 @@ private void abortInstant(String instant) throws IOException { } } + private void abortCompactionInstant(String instant) throws IOException { + try { + hudiTableInfo.getMetaClient().reloadActiveTimeline(); + CompactionUtil.rollbackCompaction( + writeClient.getHoodieTable(), instant, writeClient.getTransactionManager()); + } catch (Exception e) { + throw new IOException("Failed to rollback Hudi compaction instant " + instant + ".", e); + } + } + private HoodieTimeline getCompletedTimelineCommittedBy(String commitUser) throws IOException { hudiTableInfo.getMetaClient().reloadActiveTimeline(); HoodieTimeline timeline = @@ -274,7 +325,9 @@ private static boolean isCommittedBy( throw new IOException("Failed to load committed Hudi instant metadata."); } Map extraMetadata = metadata.getExtraMetadata(); - return extraMetadata != null && commitUser.equals(extraMetadata.get(COMMITTER_USER)); + return metadata.getOperationType() != WriteOperationType.COMPACT + && extraMetadata != null + && commitUser.equals(extraMetadata.get(COMMITTER_USER)); } catch (IOException e) { // a read failure must not be silently treated as "not committed by Fluss", // otherwise we may miss an already-tiered snapshot and re-commit duplicated data diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java index 1b9c869a24..c01415e1e1 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/tiering/HudiLakeWriter.java @@ -24,21 +24,35 @@ import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.record.LogRecord; +import org.apache.fluss.utils.TimeUtils; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; import org.apache.flink.configuration.Configuration; +import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.fluss.lake.writer.WriterInitContext.UNKNOWN_SPLIT_INDEX; import static org.apache.fluss.lake.writer.WriterInitContext.UNKNOWN_TIERING_ROUND_TIMESTAMP; @@ -49,10 +63,21 @@ public class HudiLakeWriter implements LakeWriter { private static final Logger LOG = LoggerFactory.getLogger(HudiLakeWriter.class); + private static final String COMPACTION_COMPLETE_TIMEOUT_KEY = + "fluss.tiering.compaction.complete-timeout"; + private static final String COMPACTION_SHUTDOWN_TIMEOUT_KEY = + "fluss.tiering.compaction.shutdown-timeout"; + private static final Duration DEFAULT_COMPACTION_COMPLETE_TIMEOUT = Duration.ofMinutes(30); + private static final Duration DEFAULT_COMPACTION_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30); + private final RecordWriter recordWriter; private final TableInfo tableInfo; private final HudiWriteTableInfo hudiTableInfo; private final CkpMetadata ckpMetadata; + private final long compactionCompleteTimeoutMs; + private final long compactionShutdownTimeoutMs; + @Nullable private final ExecutorService compactionExecutor; + @Nullable private final CompletableFuture>> compactionFuture; public HudiLakeWriter( HudiCatalogProvider hudiCatalogProvider, @@ -64,6 +89,16 @@ public HudiLakeWriter( this.hudiTableInfo = HudiWriteTableInfo.create(hudiCatalogProvider, tableInfo.getTablePath()); this.ckpMetadata = ckpMetadataProvider.get(tableInfo.getTablePath(), hudiTableInfo); + this.compactionCompleteTimeoutMs = + getTimeoutMillis( + hudiTableInfo.getFlinkConfig(), + COMPACTION_COMPLETE_TIMEOUT_KEY, + DEFAULT_COMPACTION_COMPLETE_TIMEOUT); + this.compactionShutdownTimeoutMs = + getTimeoutMillis( + hudiTableInfo.getFlinkConfig(), + COMPACTION_SHUTDOWN_TIMEOUT_KEY, + DEFAULT_COMPACTION_SHUTDOWN_TIMEOUT); if (writerInitContext.splitIndex() == 0) { ckpMetadata.bootstrap(); @@ -75,7 +110,19 @@ public HudiLakeWriter( } this.recordWriter = new HudiRecordWriter(writerInitContext, hudiTableInfo, ckpMetadata); + ExecutorService createdCompactionExecutor = null; + if (shouldRunCompaction()) { + createdCompactionExecutor = + Executors.newSingleThreadExecutor( + new ExecutorThreadFactory( + "hudi-compact-" + writerInitContext.tableBucket())); + } + this.compactionExecutor = createdCompactionExecutor; LOG.info("Created HudiLakeWriter with configuration {}.", hudiTableInfo.getFlinkConfig()); + this.compactionFuture = + compactionExecutor == null + ? null + : executeCompactionAsync(hudiCatalogProvider, writerInitContext); } @Override @@ -91,7 +138,13 @@ public void write(LogRecord record) throws IOException { public HudiWriteResult complete() throws IOException { try { Map> writeStatuses = recordWriter.complete(); - return HudiWriteResult.fromWriteStatuses(writeStatuses, Collections.emptyMap()); + Map> compactionWriteStatuses = Collections.emptyMap(); + if (compactionFuture != null) { + compactionWriteStatuses = waitForCompaction(); + } + return HudiWriteResult.fromWriteStatuses(writeStatuses, compactionWriteStatuses); + } catch (IOException e) { + throw e; } catch (Exception e) { throw new IOException("Failed to complete Hudi write.", e); } @@ -99,14 +152,122 @@ public HudiWriteResult complete() throws IOException { @Override public void close() throws IOException { + IOException failure = closeCompactionExecutor(); + failure = close(failure, recordWriter, "Hudi record writer"); + failure = close(failure, ckpMetadata, "Hudi checkpoint metadata"); + if (failure != null) { + throw failure; + } + } + + @Nullable + private IOException closeCompactionExecutor() { + if (compactionExecutor == null) { + return null; + } + try { - recordWriter.close(); - ckpMetadata.close(); + cancelCompaction(); + compactionExecutor.shutdown(); + if (!compactionExecutor.awaitTermination( + compactionShutdownTimeoutMs, TimeUnit.MILLISECONDS)) { + compactionExecutor.shutdownNow(); + if (!compactionExecutor.awaitTermination( + compactionShutdownTimeoutMs, TimeUnit.MILLISECONDS)) { + return new IOException( + String.format( + "Failed to close Hudi compaction executor within %s ms.", + compactionShutdownTimeoutMs)); + } + } + return null; + } catch (InterruptedException e) { + compactionExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + return new IOException("Interrupted while closing Hudi compaction executor.", e); } catch (Exception e) { - throw new IOException("Failed to close HudiLakeWriter.", e); + return new IOException("Failed to close Hudi compaction executor.", e); + } + } + + private Map> waitForCompaction() throws IOException { + try { + return compactionFuture.get(compactionCompleteTimeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + cancelCompaction(); + throw new IOException( + String.format( + "Timed out after %s ms waiting for Hudi compaction to finish.", + compactionCompleteTimeoutMs), + e); + } catch (InterruptedException e) { + cancelCompaction(); + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for Hudi compaction to finish.", e); + } catch (ExecutionException e) { + throw new IOException("Failed to execute Hudi compaction.", e.getCause()); + } + } + + private void cancelCompaction() { + if (compactionFuture != null && !compactionFuture.isDone()) { + compactionFuture.cancel(true); } } + @Nullable + private static IOException close( + @Nullable IOException failure, AutoCloseable closeable, String resourceName) { + try { + closeable.close(); + return failure; + } catch (Exception e) { + IOException closeException = + new IOException("Failed to close " + resourceName + ".", e); + if (failure == null) { + return closeException; + } + failure.addSuppressed(closeException); + return failure; + } + } + + private boolean shouldRunCompaction() { + return hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ + && hudiTableInfo.getFlinkConfig().get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); + } + + private CompletableFuture>> executeCompactionAsync( + HudiCatalogProvider hudiCatalogProvider, WriterInitContext writerInitContext) { + return CompletableFuture.supplyAsync( + () -> { + // Compaction runs in a separate thread, so it intentionally uses an isolated + // Hudi write client instead of sharing the data writer's client state. + try (HudiWriteTableInfo compactionTableInfo = + HudiWriteTableInfo.create( + hudiCatalogProvider, writerInitContext.tablePath())) { + HudiCompactionService compactionService = + HudiCompactionService.forExecutor( + compactionTableInfo, + writerInitContext.tableBucket(), + writerInitContext.partition()); + List instantTimes = + compactionService.getInflightCompactionInstantTimes(); + List> compactionPlans = + compactionService.getCompactionPlans(instantTimes); + return compactionService.executeCompaction(compactionPlans); + } catch (Exception e) { + throw new CompletionException( + String.format( + "Failed to execute Hudi compaction for table %s, bucket %s.", + writerInitContext.tablePath(), + writerInitContext.tableBucket()), + e); + } + }, + compactionExecutor); + } + private void initInstant(Configuration configuration, HoodieTableMetaClient metaClient) { metaClient.reloadActiveTimeline(); WriteOperationType writeOperationType = @@ -136,4 +297,19 @@ private static void validateWriterInitContext(WriterInitContext writerInitContex writerInitContext.tieringRoundTimestamp() != UNKNOWN_TIERING_ROUND_TIMESTAMP, "Hudi lake writer requires tiering round timestamp in WriterInitContext."); } + + private static long getTimeoutMillis(Configuration config, String key, Duration defaultTimeout) + throws IOException { + try { + Duration timeout = + TimeUtils.parseDuration( + config.getString(key, TimeUtils.getStringInMillis(defaultTimeout))); + if (timeout.toMillis() <= 0) { + throw new IllegalArgumentException("timeout must be greater than 0 ms"); + } + return timeout.toMillis(); + } catch (RuntimeException e) { + throw new IOException("Invalid Hudi compaction timeout option " + key + ".", e); + } + } } diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java index cb819fab26..21c8f83645 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java @@ -111,10 +111,12 @@ public static HudiTableInfo create(TablePath tablePath, Configuration hudiConfig HoodieTableFileSystemView fileSystemView = null; HoodieTableMetaClient metaClient = createMetaClient(basePath, hudiConfig); HoodieTimeline completedTimeline = - metaClient.getCommitsTimeline().filterCompletedInstants(); + metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); HoodieEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf()); try { + // The source planner gates readable snapshots with completedTimeline. The file + // system view still needs pending compaction instants as MOR file-slice boundaries. HoodieTimeline fileSystemViewTimeline = metaClient .getCommitsAndCompactionTimeline() diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java index 14892fff03..f678c3766c 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/testutils/FlinkHudiTieringTestBase.java @@ -49,6 +49,8 @@ import org.apache.flink.table.data.RowData; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -79,6 +81,7 @@ import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL; import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY; +import static org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; @@ -90,6 +93,10 @@ public abstract class FlinkHudiTieringTestBase { protected static final String DEFAULT_DB = "fluss"; protected static final String CATALOG_NAME = "testcatalog"; + protected static final String HUDI_CONF_PREFIX = "hudi."; + + private static final Duration HUDI_COMPACTION_COMMIT_TIMEOUT = Duration.ofSeconds(30); + private static final String COMMITTER_USER = "commit-user"; protected StreamExecutionEnvironment execEnv; @@ -183,10 +190,12 @@ protected long createPkTable( .distributedBy(bucketNum) .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) - .customProperty("hudi.precombine.field", "f_time"); + .customProperty(HUDI_CONF_PREFIX + "precombine.field", "f_time"); if (enableAutoCompaction) { tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true"); + tableBuilder.customProperty( + HUDI_CONF_PREFIX + FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); } return createTable(tablePath, tableBuilder.build()); } @@ -200,11 +209,14 @@ protected long createLogTable( .distributedBy(bucketNum, "f_int") .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) - .customProperty("hudi.precombine.field", "f_str") - .customProperty("hudi." + FlinkOptions.RECORD_KEY_FIELD.key(), "f_int"); + .customProperty(HUDI_CONF_PREFIX + "precombine.field", "f_str") + .customProperty( + HUDI_CONF_PREFIX + FlinkOptions.RECORD_KEY_FIELD.key(), "f_int"); if (enableAutoCompaction) { tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true"); + tableBuilder.customProperty( + HUDI_CONF_PREFIX + FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); } return createTable(tablePath, tableBuilder.build()); } @@ -335,31 +347,37 @@ protected void checkDataInHudiMORTable( expectedRecords.add(formatMORRow(row)); } - List actualRecords = - collectHudiRows( - tablePath, - partition, - bucket, - record -> - record.getBoolean(5) - + "," - + record.getInt(6) - + "," - + record.getLong(7) - + "," - + record.getFloat(8) - + "," - + record.getDouble(9) - + "," - + record.getString(10).toString() - + "," - + record.getDecimal(11, 5, 2).toBigDecimal().toPlainString() - + "," - + record.getDecimal(12, 20, 0) - .toBigDecimal() - .toPlainString()); - - assertThat(actualRecords).containsExactlyInAnyOrderElementsOf(expectedRecords); + retry( + Duration.ofMinutes(1), + () -> { + List actualRecords = + collectHudiRows( + tablePath, + partition, + bucket, + record -> + record.getBoolean(5) + + "," + + record.getInt(6) + + "," + + record.getLong(7) + + "," + + record.getFloat(8) + + "," + + record.getDouble(9) + + "," + + record.getString(10).toString() + + "," + + record.getDecimal(11, 5, 2) + .toBigDecimal() + .toPlainString() + + "," + + record.getDecimal(12, 20, 0) + .toBigDecimal() + .toPlainString()); + + assertThat(actualRecords).containsExactlyInAnyOrderElementsOf(expectedRecords); + }); } protected void checkDataInHudiMORPartitionTable( @@ -431,16 +449,10 @@ protected void checkFlussOffsetsInSnapshot( HudiTableInfo.create(tablePath, Configuration.fromMap(getHudiCatalogConf()))) { HoodieTableMetaClient metaClient = hudiTableInfo.getMetaClient(); metaClient.reloadActiveTimeline(); - HoodieTimeline timeline = - metaClient - .getActiveTimeline() - .getCommitsAndCompactionTimeline() - .filterCompletedInstants(); - Optional latestInstant = - timeline.getReverseOrderedInstantsByCompletionTime().findFirst(); - assertThat(latestInstant).isPresent(); + HoodieTimeline timeline = hudiTableInfo.getCompletedTimeline(); + HoodieInstant latestInstant = getLatestFlussDataInstant(hudiTableInfo); - HoodieCommitMetadata metadata = timeline.readCommitMetadata(latestInstant.get()); + HoodieCommitMetadata metadata = timeline.readCommitMetadata(latestInstant); Map extraMetadata = metadata.getExtraMetadata(); assertThat(extraMetadata).isNotNull(); String offsetFile = extraMetadata.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY); @@ -456,6 +468,12 @@ protected void checkFlussOffsetsInSnapshot( } } + protected void checkHudiCompactionCommitted(TablePath tablePath) { + retry( + HUDI_COMPACTION_COMMIT_TIMEOUT, + () -> assertThat(hasHudiCompactionCommitted(tablePath)).isTrue()); + } + private static String formatMORRow(InternalRow row) { return row.getBoolean(0) + "," @@ -492,12 +510,12 @@ private List collectHudiRows( int columnCount = avroSchema.getFields().size(); List fileSlices = - hudiTableInfo - .getFileSystemView() - .getLatestFileSlices(partition) - .collect(Collectors.toList()); + getLatestFileSlicesAtCompletedInstant(hudiTableInfo, partition); List records = new ArrayList<>(); for (FileSlice fileSlice : fileSlices) { + if (fileSlice.isEmpty()) { + continue; + } if (!fileSlice.getFileId().contains(BucketIdentifier.bucketIdStr(bucket))) { continue; } @@ -521,6 +539,24 @@ private List collectHudiRows( } } + private static List getLatestFileSlicesAtCompletedInstant( + HudiTableInfo hudiTableInfo, String partition) { + HoodieInstant latestInstant = getLatestFlussDataInstant(hudiTableInfo); + String latestInstantTime = latestInstant.requestedTime(); + if (hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ) { + return hudiTableInfo + .getFileSystemView() + .getLatestMergedFileSlicesBeforeOrOn(partition, latestInstantTime) + .collect(Collectors.toList()); + } + // includeFileSliceBefore=true lets COW reads include the base file slice visible at this + // completed instant. + return hudiTableInfo + .getFileSystemView() + .getLatestFileSlicesBeforeOrOn(partition, latestInstantTime, true) + .collect(Collectors.toList()); + } + private org.apache.flink.configuration.Configuration buildFlinkHudiOptions( TablePath tablePath, HudiTableInfo hudiTableInfo, @@ -533,6 +569,59 @@ private org.apache.flink.configuration.Configuration buildFlinkHudiOptions( return org.apache.flink.configuration.Configuration.fromMap(hudiOptions); } + private static HoodieInstant getLatestFlussDataInstant(HudiTableInfo hudiTableInfo) { + HoodieTimeline completedTimeline = hudiTableInfo.getCompletedTimeline(); + return completedTimeline + .getReverseOrderedInstants() + .filter(instant -> isFlussDataInstant(completedTimeline, instant)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "No Fluss data instant found for Hudi table " + + hudiTableInfo.getTablePath() + + ".")); + } + + private static boolean isFlussDataInstant(HoodieTimeline timeline, HoodieInstant instant) { + try { + HoodieCommitMetadata metadata = timeline.readCommitMetadata(instant); + Map extraMetadata = + metadata == null ? null : metadata.getExtraMetadata(); + return metadata != null + && metadata.getOperationType() != WriteOperationType.COMPACT + && extraMetadata != null + && FLUSS_LAKE_TIERING_COMMIT_USER.equals(extraMetadata.get(COMMITTER_USER)); + } catch (Exception e) { + throw new RuntimeException("Failed to read Hudi instant metadata " + instant + ".", e); + } + } + + private static boolean hasHudiCompactionCommitted(TablePath tablePath) { + try (HudiTableInfo hudiTableInfo = + HudiTableInfo.create(tablePath, Configuration.fromMap(getHudiCatalogConf()))) { + HoodieTimeline timeline = + hudiTableInfo + .getMetaClient() + .getActiveTimeline() + .getCommitsAndCompactionTimeline() + .filterCompletedInstants(); + return timeline.getInstantsAsStream() + .anyMatch(instant -> isCompactionInstant(timeline, instant)); + } catch (Exception e) { + return false; + } + } + + private static boolean isCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) { + try { + HoodieCommitMetadata metadata = timeline.readCommitMetadata(instant); + return metadata != null && metadata.getOperationType() == WriteOperationType.COMPACT; + } catch (Exception e) { + return false; + } + } + private interface HudiRowFormatter { String format(RowData row); } diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java index 0f1acc0de7..707fb6d393 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringITCase.java @@ -127,6 +127,44 @@ void testTiering() throws Exception { } } + @Test + void testPkTableTieringWithAutoCompaction() throws Exception { + TablePath pkTablePath = TablePath.of(DEFAULT_DB, "pkTableWithAutoCompaction"); + long pkTableId = createPkTable(pkTablePath, 1, true, PK_SCHEMA); + TableBucket pkTableBucket = new TableBucket(pkTableId, 0); + + List rows = Arrays.asList(pkRow(1, "v1"), pkRow(2, "v2"), pkRow(3, "v3")); + writeRows(pkTablePath, rows, false); + waitUntilSnapshot(pkTableId, 1); + + execEnv.enableCheckpointing(1000); + JobClient jobClient = buildTieringJob(execEnv); + try { + assertReplicaStatus(pkTableBucket, 3); + checkDataInHudiMORTable(pkTablePath, "", rows, 0); + checkFlussOffsetsInSnapshot(pkTablePath, Collections.singletonMap(pkTableBucket, 3L)); + + rows = Arrays.asList(pkRow(1, "v111"), pkRow(2, "v222"), pkRow(3, "v333")); + writeRows(pkTablePath, rows, false); + + // 3 initial records + 3 delete records + 3 insert records. + assertReplicaStatus(pkTableBucket, 9); + checkDataInHudiMORTable(pkTablePath, "", rows, 0); + checkFlussOffsetsInSnapshot(pkTablePath, Collections.singletonMap(pkTableBucket, 9L)); + + rows = Arrays.asList(pkRow(1, "v1111"), pkRow(2, "v2222"), pkRow(3, "v3333")); + writeRows(pkTablePath, rows, false); + + // 9 previous records + 3 delete records + 3 insert records. + assertReplicaStatus(pkTableBucket, 15); + checkDataInHudiMORTable(pkTablePath, "", rows, 0); + checkFlussOffsetsInSnapshot(pkTablePath, Collections.singletonMap(pkTableBucket, 15L)); + checkHudiCompactionCommitted(pkTablePath); + } finally { + jobClient.cancel().get(); + } + } + private void testLogTableTiering() throws Exception { TablePath logTablePath = TablePath.of(DEFAULT_DB, "logTable"); long logTableId = createLogTable(logTablePath, 1, false, LOG_SCHEMA); @@ -197,8 +235,9 @@ private Tuple2 createPartitionedTable(TablePath partition AutoPartitionTimeUnit.YEAR) .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) - .customProperty("hudi.precombine.field", "date") - .customProperty("hudi.hoodie.datasource.write.recordkey.field", "id") + .customProperty(HUDI_CONF_PREFIX + "precombine.field", "date") + .customProperty( + HUDI_CONF_PREFIX + "hoodie.datasource.write.recordkey.field", "id") .build(); return Tuple2.of( createTable(partitionedTablePath, partitionedTableDescriptor), diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java index 815daf0695..949aba6be0 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/tiering/HudiTieringTest.java @@ -20,8 +20,10 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.lake.committer.CommittedLakeSnapshot; import org.apache.fluss.lake.committer.CommitterInitContext; +import org.apache.fluss.lake.committer.LakeCommitResult; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.hudi.HudiLakeCatalog; +import org.apache.fluss.lake.hudi.utils.meta.CkpMetadata; import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; @@ -37,27 +39,50 @@ import org.apache.fluss.row.GenericRow; import org.apache.fluss.types.DataTypes; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.mockito.InOrder; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY; import static org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyList; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** Test for tiering Fluss records to Hudi via {@link HudiLakeTieringFactory}. */ class HudiTieringTest { + private static final String HUDI_CONF_PREFIX = "hudi."; + private static final String DATA_INSTANT = "20260624000200000"; + private static final String COMPACTION_INSTANT = "20260624000300000"; + @TempDir private File tempWarehouseDir; private Configuration hudiConfig; @@ -130,9 +155,8 @@ tablePath, new TableBucket(1L, 0), tableInfo, 0, 0L))) { } @Test - void testRejectCompactionWriteStatusesOnCommit() throws Exception { - TablePath tablePath = - TablePath.of("hudi", "test_reject_compaction_write_statuses_on_commit"); + void testRejectMissingWriteStatsOnCommit() throws Exception { + TablePath tablePath = TablePath.of("hudi", "test_reject_missing_write_stats_on_commit"); TableDescriptor tableDescriptor = createLogTableDescriptor(); hudiLakeCatalog.createTable( tablePath, tableDescriptor, new TestingLakeCatalogContext(tableDescriptor)); @@ -150,16 +174,176 @@ void testRejectCompactionWriteStatusesOnCommit() throws Exception { assertThatThrownBy(() -> committer.commit(committable, Collections.emptyMap())) .isInstanceOf(IOException.class) - .hasMessageContaining("Hudi compaction write stats are not supported yet"); + .hasMessageContaining("Hudi write stats must contain exactly one instant"); } } + @Test + void testCommitReturnsDataInstantAndSchedulesCompaction() throws Exception { + TestingCommitterContext context = createTestingCommitterContext(); + HudiCommittable committable = committableWithCompaction(); + whenDataCommitSucceeds(context); + when(context.compactionService.commitCompaction( + eq(committable.getCompactionWriteStats()), anyMap())) + .thenReturn(COMPACTION_INSTANT); + + LakeCommitResult commitResult = + context.committer.commit(committable, Collections.emptyMap()); + + assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(Long.parseLong(DATA_INSTANT)); + InOrder inOrder = + inOrder(context.writeClient, context.ckpMetadata, context.compactionService); + inOrder.verify(context.writeClient) + .commitStats(eq(DATA_INSTANT), anyList(), any(Option.class), eq("commit")); + inOrder.verify(context.ckpMetadata).commitInstant(DATA_INSTANT); + inOrder.verify(context.compactionService) + .commitCompaction(eq(committable.getCompactionWriteStats()), anyMap()); + inOrder.verify(context.compactionService).scheduleCompaction(); + inOrder.verify(context.compactionService).markSelectedCompactionsInflight(); + } + + @Test + void testCommitFailsWhenCompactionCommitFails() throws Exception { + TestingCommitterContext context = createTestingCommitterContext(); + HudiCommittable committable = committableWithCompaction(); + whenDataCommitSucceeds(context); + doThrow(new IOException("compaction failed")) + .when(context.compactionService) + .commitCompaction(eq(committable.getCompactionWriteStats()), anyMap()); + + assertThatThrownBy(() -> context.committer.commit(committable, Collections.emptyMap())) + .isInstanceOf(IOException.class) + .hasMessageContaining("compaction failed"); + verify(context.compactionService, never()).scheduleCompaction(); + verify(context.compactionService, never()).markSelectedCompactionsInflight(); + } + + @Test + void testScheduleCompactionFailureDoesNotFailCommit() throws Exception { + TestingCommitterContext context = createTestingCommitterContext(); + HudiCommittable committable = committableWithCompaction(); + whenDataCommitSucceeds(context); + when(context.compactionService.commitCompaction( + eq(committable.getCompactionWriteStats()), anyMap())) + .thenReturn(COMPACTION_INSTANT); + doThrow(new IOException("schedule failed")) + .when(context.compactionService) + .scheduleCompaction(); + + LakeCommitResult commitResult = + context.committer.commit(committable, Collections.emptyMap()); + + assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(Long.parseLong(DATA_INSTANT)); + verify(context.compactionService, never()).markSelectedCompactionsInflight(); + } + + @Test + void testAbortRollsBackCompactionBeforeDataInstant() throws Exception { + TestingCommitterContext context = createTestingCommitterContext(); + when(context.writeClient.getHoodieTable()) + .thenThrow(new RuntimeException("rollback failed")); + when(context.writeClient.rollback(DATA_INSTANT)).thenReturn(true); + + assertThatThrownBy(() -> context.committer.abort(committableWithCompaction())) + .isInstanceOf(IOException.class) + .hasMessageContaining(COMPACTION_INSTANT); + InOrder inOrder = inOrder(context.metaClient, context.writeClient, context.ckpMetadata); + inOrder.verify(context.metaClient).reloadActiveTimeline(); + inOrder.verify(context.writeClient).getHoodieTable(); + inOrder.verify(context.writeClient).rollback(DATA_INSTANT); + inOrder.verify(context.ckpMetadata).abortInstant(DATA_INSTANT); + } + + @Test + void testFlinkCompactionConfigMapping() { + org.apache.flink.configuration.Configuration config = + new org.apache.flink.configuration.Configuration(); + config.set(FlinkOptions.PATH, "/tmp/hudi/table"); + config.set(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, FlinkCompactionConfig.NUM_OR_TIME); + config.set(FlinkOptions.ARCHIVE_MAX_COMMITS, 30); + config.set(FlinkOptions.ARCHIVE_MIN_COMMITS, 20); + config.set(FlinkOptions.CLEAN_POLICY, "KEEP_LATEST_COMMITS"); + config.set(FlinkOptions.CLEAN_RETAIN_COMMITS, 10); + config.set(FlinkOptions.CLEAN_RETAIN_HOURS, 24); + config.set(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, 5); + config.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 2); + config.set(FlinkOptions.COMPACTION_DELTA_SECONDS, 120); + config.set(FlinkOptions.COMPACTION_MAX_MEMORY, 256); + config.set(FlinkOptions.COMPACTION_TARGET_IO, 1024L); + config.set(FlinkOptions.COMPACTION_TASKS, 3); + config.set(FlinkOptions.CLEAN_ASYNC_ENABLED, true); + config.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true); + + FlinkCompactionConfig compactionConfig = + HudiCompactionService.toFlinkCompactionConfig(config); + + assertThat(compactionConfig.path).isEqualTo("/tmp/hudi/table"); + assertThat(compactionConfig.compactionTriggerStrategy) + .isEqualTo(FlinkCompactionConfig.NUM_OR_TIME); + assertThat(compactionConfig.archiveMaxCommits).isEqualTo(30); + assertThat(compactionConfig.archiveMinCommits).isEqualTo(20); + assertThat(compactionConfig.cleanPolicy).isEqualTo("KEEP_LATEST_COMMITS"); + assertThat(compactionConfig.cleanRetainCommits).isEqualTo(10); + assertThat(compactionConfig.cleanRetainHours).isEqualTo(24); + assertThat(compactionConfig.cleanRetainFileVersions).isEqualTo(5); + assertThat(compactionConfig.compactionDeltaCommits).isEqualTo(2); + assertThat(compactionConfig.compactionDeltaSeconds).isEqualTo(120); + assertThat(compactionConfig.compactionMaxMemory).isEqualTo(256); + assertThat(compactionConfig.compactionTargetIo).isEqualTo(1024L); + assertThat(compactionConfig.compactionTasks).isEqualTo(3); + assertThat(compactionConfig.cleanAsyncEnable).isTrue(); + assertThat(compactionConfig.schedule).isTrue(); + assertThat(compactionConfig.compactionPlanSelectStrategy) + .isEqualTo(CompactionPlanStrategy.NUM_INSTANTS); + assertThat(compactionConfig.maxNumCompactionPlans).isEqualTo(1); + } + private LakeCommitter createLakeCommitter( TablePath tablePath, TableInfo tableInfo) throws IOException { return hudiLakeTieringFactory.createLakeCommitter( new TestingCommitterInitContext(tablePath, tableInfo)); } + private static TestingCommitterContext createTestingCommitterContext() { + HudiWriteTableInfo hudiTableInfo = mock(HudiWriteTableInfo.class); + HoodieFlinkWriteClient writeClient = mock(HoodieFlinkWriteClient.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + CkpMetadata ckpMetadata = mock(CkpMetadata.class); + HudiCompactionService compactionService = mock(HudiCompactionService.class); + org.apache.flink.configuration.Configuration flinkConfig = + new org.apache.flink.configuration.Configuration(); + flinkConfig.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true); + + when(hudiTableInfo.getWriteClient()).thenReturn(writeClient); + when(hudiTableInfo.getMetaClient()).thenReturn(metaClient); + when(hudiTableInfo.getFlinkConfig()).thenReturn(flinkConfig); + when(hudiTableInfo.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ); + when(hudiTableInfo.getTablePath()).thenReturn(TablePath.of("hudi", "mock_table")); + when(metaClient.getCommitActionType()).thenReturn("commit"); + return new TestingCommitterContext( + new HudiLakeCommitter(hudiTableInfo, ckpMetadata, compactionService), + writeClient, + metaClient, + ckpMetadata, + compactionService); + } + + private static void whenDataCommitSucceeds(TestingCommitterContext context) { + when(context.writeClient.commitStats( + eq(DATA_INSTANT), anyList(), any(Option.class), eq("commit"))) + .thenReturn(true); + } + + private static HudiCommittable committableWithCompaction() { + Map writeStats = new HashMap<>(); + writeStats.put( + DATA_INSTANT, new HudiWriteStats(Collections.singletonList(writeStat()), 0L)); + Map compactionWriteStats = new HashMap<>(); + compactionWriteStats.put( + COMPACTION_INSTANT, new HudiWriteStats(Collections.singletonList(writeStat()), 0L)); + return new HudiCommittable(writeStats, compactionWriteStats); + } + private static TableDescriptor createLogTableDescriptor() { return TableDescriptor.builder() .schema( @@ -168,8 +352,8 @@ private static TableDescriptor createLogTableDescriptor() { .column("name", DataTypes.STRING()) .build()) .distributedBy(1, "id") - .customProperty("hudi." + FlinkOptions.RECORD_KEY_FIELD.key(), "id") - .customProperty("hudi.precombine.field", "name") + .customProperty(HUDI_CONF_PREFIX + FlinkOptions.RECORD_KEY_FIELD.key(), "id") + .customProperty(HUDI_CONF_PREFIX + "precombine.field", "name") .build(); } @@ -188,6 +372,28 @@ private static HoodieWriteStat writeStat() { return writeStat; } + private static class TestingCommitterContext { + + private final HudiLakeCommitter committer; + private final HoodieFlinkWriteClient writeClient; + private final HoodieTableMetaClient metaClient; + private final CkpMetadata ckpMetadata; + private final HudiCompactionService compactionService; + + private TestingCommitterContext( + HudiLakeCommitter committer, + HoodieFlinkWriteClient writeClient, + HoodieTableMetaClient metaClient, + CkpMetadata ckpMetadata, + HudiCompactionService compactionService) { + this.committer = committer; + this.writeClient = writeClient; + this.metaClient = metaClient; + this.ckpMetadata = ckpMetadata; + this.compactionService = compactionService; + } + } + private static class TestingWriterInitContext implements WriterInitContext { private final TablePath tablePath;