-
Notifications
You must be signed in to change notification settings - Fork 564
[lake/hudi] Introduce Hudi compaction in tiering service #3518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,24 +27,26 @@ | |
|
|
||
| import org.apache.hudi.client.HoodieFlinkWriteClient; | ||
| import org.apache.hudi.common.model.HoodieCommitMetadata; | ||
| import org.apache.hudi.common.model.HoodieTableType; | ||
| import org.apache.hudi.common.model.WriteOperationType; | ||
| import org.apache.hudi.common.table.timeline.HoodieInstant; | ||
| import org.apache.hudi.common.table.timeline.HoodieTimeline; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.configuration.FlinkOptions; | ||
| import org.apache.hudi.exception.HoodieException; | ||
| import org.apache.hudi.util.CompactionUtil; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.LinkedHashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
|
|
||
| import static org.apache.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; | ||
|
|
||
|
|
@@ -58,6 +60,7 @@ public class HudiLakeCommitter implements LakeCommitter<HudiWriteResult, HudiCom | |
| private final HudiWriteTableInfo hudiTableInfo; | ||
| private final HoodieFlinkWriteClient<?> writeClient; | ||
| private final CkpMetadata ckpMetadata; | ||
| private final HudiCompactionService compactionService; | ||
|
|
||
| public HudiLakeCommitter( | ||
| HudiCatalogProvider hudiCatalogProvider, | ||
|
|
@@ -67,10 +70,21 @@ public HudiLakeCommitter( | |
| this.hudiTableInfo = HudiWriteTableInfo.create(hudiCatalogProvider, tablePath); | ||
| this.writeClient = hudiTableInfo.getWriteClient(); | ||
| this.ckpMetadata = ckpMetadataProvider.get(tablePath, hudiTableInfo); | ||
| this.compactionService = HudiCompactionService.forScheduler(hudiTableInfo); | ||
| LOG.info( | ||
| "Created HudiLakeCommitter with configuration {}.", hudiTableInfo.getFlinkConfig()); | ||
| } | ||
|
|
||
| HudiLakeCommitter( | ||
| HudiWriteTableInfo hudiTableInfo, | ||
| CkpMetadata ckpMetadata, | ||
| HudiCompactionService compactionService) { | ||
| this.hudiTableInfo = hudiTableInfo; | ||
| this.writeClient = hudiTableInfo.getWriteClient(); | ||
| this.ckpMetadata = ckpMetadata; | ||
| this.compactionService = compactionService; | ||
| } | ||
|
|
||
| @Override | ||
| public HudiCommittable toCommittable(List<HudiWriteResult> hudiWriteResults) { | ||
| HudiCommittable.Builder committableBuilder = HudiCommittable.builder(); | ||
|
|
@@ -85,8 +99,6 @@ public HudiCommittable toCommittable(List<HudiWriteResult> hudiWriteResults) { | |
| public LakeCommitResult commit( | ||
| HudiCommittable committable, Map<String, String> snapshotProperties) | ||
| throws IOException { | ||
| ensureNoCompactionWriteStats(committable); | ||
|
|
||
| Map<String, HudiWriteStats> writeStatsByInstant = committable.getWriteStats(); | ||
| if (writeStatsByInstant.size() != 1) { | ||
| throw new IOException( | ||
|
|
@@ -129,6 +141,7 @@ public LakeCommitResult commit( | |
|
|
||
| ckpMetadata.commitInstant(instant); | ||
| LOG.info("Committed Hudi instant {} successfully.", instant); | ||
| commitCompactionAndSchedule(committable.getCompactionWriteStats()); | ||
| return LakeCommitResult.committedIsReadable(parseSnapshotId(instant)); | ||
| } catch (Exception e) { | ||
| if (e instanceof IOException) { | ||
|
|
@@ -140,10 +153,21 @@ public LakeCommitResult commit( | |
|
|
||
| @Override | ||
| public void abort(HudiCommittable committable) throws IOException { | ||
| Set<String> instants = new LinkedHashSet<>(committable.getWriteStats().keySet()); | ||
| instants.addAll(committable.getCompactionWriteStats().keySet()); | ||
| IOException failure = null; | ||
| for (String instant : instants) { | ||
| for (String instant : committable.getCompactionWriteStats().keySet()) { | ||
| try { | ||
| abortCompactionInstant(instant); | ||
| LOG.info("Aborted Hudi compaction instant {}.", instant); | ||
| } catch (IOException e) { | ||
| failure = | ||
| addSuppressed( | ||
| failure, | ||
| new IOException( | ||
| "Failed to abort Hudi compaction instant " + instant + ".", | ||
| e)); | ||
| } | ||
| } | ||
| for (String instant : committable.getWriteStats().keySet()) { | ||
| try { | ||
| abortInstant(instant); | ||
| LOG.info("Aborted Hudi instant {}.", instant); | ||
|
|
@@ -160,17 +184,6 @@ public void abort(HudiCommittable committable) throws IOException { | |
| } | ||
| } | ||
|
|
||
| private static void ensureNoCompactionWriteStats(HudiCommittable committable) | ||
| throws IOException { | ||
| Map<String, HudiWriteStats> compactionWriteStats = committable.getCompactionWriteStats(); | ||
| if (!compactionWriteStats.isEmpty()) { | ||
| throw new IOException( | ||
| "Hudi compaction write stats are not supported yet, but got instants " | ||
| + compactionWriteStats.keySet() | ||
| + "."); | ||
| } | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSnapshotIdOfFluss) | ||
|
|
@@ -207,6 +220,34 @@ public void close() throws Exception { | |
| IOUtils.closeQuietly(hudiTableInfo, "hudi table info"); | ||
| } | ||
|
|
||
| private void commitCompactionAndSchedule(Map<String, HudiWriteStats> compactionWriteStats) | ||
| throws IOException { | ||
| if (!isAutoCompactionEnabled()) { | ||
| return; | ||
| } | ||
|
|
||
| String latestCommittedInstant = | ||
| compactionService.commitCompaction(compactionWriteStats, Collections.emptyMap()); | ||
| if (latestCommittedInstant != null) { | ||
| LOG.info("Committed Hudi compaction instant {}.", latestCommittedInstant); | ||
| } | ||
|
|
||
| try { | ||
| compactionService.scheduleCompaction(); | ||
| compactionService.markSelectedCompactionsInflight(); | ||
| } catch (Exception e) { | ||
| LOG.warn( | ||
| "Failed to schedule Hudi compaction for table {}.", | ||
| hudiTableInfo.getTablePath(), | ||
| e); | ||
| } | ||
| } | ||
|
|
||
| private boolean isAutoCompactionEnabled() { | ||
| return hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ | ||
| && hudiTableInfo.getFlinkConfig().get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This uses Hudi's own Please unify on a single source of truth. Easiest path: in |
||
| } | ||
|
|
||
| private void validateWriteStats(String instant, HudiWriteStats writeStats) { | ||
| long totalErrorRecords = writeStats.getTotalErrorRecords(); | ||
| if (totalErrorRecords > 0 | ||
|
|
@@ -250,6 +291,16 @@ private void abortInstant(String instant) throws IOException { | |
| } | ||
| } | ||
|
|
||
| private void abortCompactionInstant(String instant) throws IOException { | ||
| try { | ||
| hudiTableInfo.getMetaClient().reloadActiveTimeline(); | ||
| CompactionUtil.rollbackCompaction( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| writeClient.getHoodieTable(), instant, writeClient.getTransactionManager()); | ||
| } catch (Exception e) { | ||
| throw new IOException("Failed to rollback Hudi compaction instant " + instant + ".", e); | ||
| } | ||
| } | ||
|
|
||
| private HoodieTimeline getCompletedTimelineCommittedBy(String commitUser) throws IOException { | ||
| hudiTableInfo.getMetaClient().reloadActiveTimeline(); | ||
| HoodieTimeline timeline = | ||
|
|
@@ -274,7 +325,9 @@ private static boolean isCommittedBy( | |
| throw new IOException("Failed to load committed Hudi instant metadata."); | ||
| } | ||
| Map<String, String> extraMetadata = metadata.getExtraMetadata(); | ||
| return extraMetadata != null && commitUser.equals(extraMetadata.get(COMMITTER_USER)); | ||
| return metadata.getOperationType() != WriteOperationType.COMPACT | ||
| && extraMetadata != null | ||
| && commitUser.equals(extraMetadata.get(COMMITTER_USER)); | ||
| } catch (IOException e) { | ||
| // a read failure must not be silently treated as "not committed by Fluss", | ||
| // otherwise we may miss an already-tiered snapshot and re-commit duplicated data | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skipping empty file slices is correct for MOR, but please add a short comment explaining why we can encounter an empty slice here (e.g. a pending compaction has scheduled a new file group but no base file has been written yet). Without that context the next reader will likely wonder if this is hiding a bug.