diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java index 925377067e..ed47361c3c 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java @@ -20,11 +20,8 @@ import org.apache.fluss.lake.source.LakeSplit; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.table.source.DataSplit; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; /** Split for paimon table. */ @@ -36,9 +33,13 @@ public class PaimonSplit implements LakeSplit { private final boolean isBucketUnAware; - public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware) { + // Partition values in Fluss partition-name format + private final List partition; + + public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware, List partition) { this.dataSplit = dataSplit; this.isBucketUnAware = isBucketUnAware; + this.partition = partition; } @Override @@ -52,19 +53,7 @@ public int bucket() { @Override public List partition() { - BinaryRow partition = dataSplit.partition(); - if (partition.getFieldCount() == 0) { - return Collections.emptyList(); - } - - List partitions = new ArrayList<>(); - for (int i = 0; i < partition.getFieldCount(); i++) { - // Todo Currently, partition column must be String datatype, so we can always use - // consider it as string. Revisit here when - // #489 is finished. - partitions.add(partition.getString(i).toString()); - } - return partitions; + return partition; } public DataSplit dataSplit() { diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java index db723f581f..76139e6e80 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java @@ -19,6 +19,7 @@ package org.apache.fluss.lake.paimon.source; import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.paimon.utils.PaimonPartitionUtils; import org.apache.fluss.lake.source.Planner; import org.apache.fluss.metadata.TablePath; @@ -33,6 +34,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.RowType; import javax.annotation.Nullable; @@ -69,13 +71,19 @@ public List plan() { FileStoreTable fileStoreTable = getTable(catalog, tablePath, snapshotId); InnerTableScan tableScan = fileStoreTable.newScan(); boolean isBucketUnAware = fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE; + RowType partitionType = fileStoreTable.schema().logicalPartitionType(); if (predicate != null) { tableScan = tableScan.withFilter(predicate); } for (Split split : tableScan.plan().splits()) { DataSplit dataSplit = (DataSplit) split; - splits.add(new PaimonSplit(dataSplit, isBucketUnAware)); + List partition = + dataSplit.partition().getFieldCount() == 0 + ? Collections.emptyList() + : PaimonPartitionUtils.partitionValues( + dataSplit.partition(), partitionType); + splits.add(new PaimonSplit(dataSplit, isBucketUnAware, partition)); } } return splits; diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java index c5e4d38d1d..4ba731e5de 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java @@ -28,6 +28,8 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** Serializer for paimon split. */ public class PaimonSplitSerializer implements SimpleVersionedSerializer { @@ -46,6 +48,11 @@ public byte[] serialize(PaimonSplit paimonSplit) throws IOException { DataSplit dataSplit = paimonSplit.dataSplit(); InstantiationUtil.serializeObject(view, dataSplit); view.writeBoolean(paimonSplit.isBucketUnAware()); + List partition = paimonSplit.partition(); + view.writeInt(partition.size()); + for (String value : partition) { + view.writeUTF(value); + } return out.toByteArray(); } @@ -59,7 +66,12 @@ public PaimonSplit deserialize(int version, byte[] serialized) throws IOExceptio if (version == VERSION_1) { DataInputStream dis = new DataInputStream(in); boolean isBucketUnAware = dis.readBoolean(); - return new PaimonSplit(dataSplit, isBucketUnAware); + int size = dis.readInt(); + List partition = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + partition.add(dis.readUTF()); + } + return new PaimonSplit(dataSplit, isBucketUnAware, partition); } else { throw new IOException("Unsupported PaimonSplit serialization version: " + version); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java index 54c3c7f948..9d9485155d 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java @@ -35,7 +35,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.BinaryString; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.FileStoreScan; @@ -48,7 +47,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -601,13 +599,10 @@ private Map getPartitionNameToIdMapping() throws IOException { * @return partition name string */ private String getPartitionNameFromBinaryRow(BinaryRow partition) { - List partitionValues = new ArrayList<>(); - for (int i = 0; i < partition.getFieldCount(); i++) { - // todo: consider other partition type - BinaryString binaryString = partition.getString(i); - partitionValues.add(binaryString.toString()); - } - return String.join(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, partitionValues); + return String.join( + ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, + PaimonPartitionUtils.partitionValues( + partition, fileStoreTable.schema().logicalPartitionType())); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java new file mode 100644 index 0000000000..967bee34bc --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.utils.PartitionNameConverters; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeChecks; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.List; + +/** Extracts partition values from a Paimon partition row as Fluss partition-name strings. */ +public class PaimonPartitionUtils { + + private PaimonPartitionUtils() {} + + /** + * Converts a Paimon partition row into Fluss partition value strings, in partition-key order. + * The output must match {@code PartitionUtils#convertValueOfType} (the Fluss-side format), + * otherwise lake-side and Fluss-side partition names won't match during union read split + * generation. + */ + public static List partitionValues(BinaryRow partition, RowType partitionType) { + List values = new ArrayList<>(partition.getFieldCount()); + for (int i = 0; i < partition.getFieldCount(); i++) { + values.add(toFlussPartitionString(partition, i, partitionType.getTypeAt(i))); + } + return values; + } + + private static String toFlussPartitionString(BinaryRow partition, int pos, DataType type) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return partition.getString(pos).toString(); + case BOOLEAN: + return Boolean.toString(partition.getBoolean(pos)); + case TINYINT: + return Byte.toString(partition.getByte(pos)); + case SMALLINT: + return Short.toString(partition.getShort(pos)); + case INTEGER: + return Integer.toString(partition.getInt(pos)); + case BIGINT: + return Long.toString(partition.getLong(pos)); + case FLOAT: + return PartitionNameConverters.reformatFloat(partition.getFloat(pos)); + case DOUBLE: + return PartitionNameConverters.reformatDouble(partition.getDouble(pos)); + case DATE: + return PartitionNameConverters.dayToString(partition.getInt(pos)); + case TIME_WITHOUT_TIME_ZONE: + return PartitionNameConverters.milliToString(partition.getInt(pos)); + case BINARY: + case VARBINARY: + return PartitionNameConverters.hexString(partition.getBinary(pos)); + case TIMESTAMP_WITHOUT_TIME_ZONE: + { + Timestamp ts = partition.getTimestamp(pos, DataTypeChecks.getPrecision(type)); + return PartitionNameConverters.timestampToString( + TimestampNtz.fromMillis( + ts.getMillisecond(), ts.getNanoOfMillisecond())); + } + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + { + Timestamp ts = partition.getTimestamp(pos, DataTypeChecks.getPrecision(type)); + return PartitionNameConverters.timestampToString( + TimestampLtz.fromEpochMillis( + ts.getMillisecond(), ts.getNanoOfMillisecond())); + } + default: + throw new IllegalArgumentException( + "Unsupported partition column type: " + type.getTypeRoot()); + } + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java index 3f72cea558..dc75f34e44 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java @@ -77,6 +77,7 @@ void testSerializeAndDeserialize() throws Exception { assertThat(deserialized.dataSplit()).isEqualTo(originalPaimonSplit.dataSplit()); assertThat(deserialized.isBucketUnAware()).isEqualTo(originalPaimonSplit.isBucketUnAware()); + assertThat(deserialized.partition()).isEqualTo(originalPaimonSplit.partition()); } @Test diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java index a81800374d..9d5b5ee5de 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java @@ -35,6 +35,7 @@ import org.apache.paimon.types.DataTypes; import org.junit.jupiter.api.Test; +import java.time.LocalDate; import java.util.Collections; import java.util.List; @@ -77,4 +78,33 @@ void testPaimonSplit() throws Exception { assertThat(actualSplit).isEqualTo(paimonSplit.dataSplit()); assertThat(((DataSplit) actualSplit).bucket()).isEqualTo(paimonSplit.bucket()); } + + @Test + void testPaimonSplitWithDatePartition() throws Exception { + int bucketNum = 1; + TablePath tablePath = TablePath.of(DEFAULT_DB, "non_string_partition_table"); + Schema.Builder builder = + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("dt", DataTypes.DATE()); + builder.partitionKeys("dt"); + builder.primaryKey("c1", "dt"); + builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum)); + createTable(tablePath, builder.build()); + Table table = getTable(tablePath); + + int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay(); + GenericRow record1 = GenericRow.of(12, BinaryString.fromString("a"), epochDay); + writeRecord(tablePath, Collections.singletonList(record1)); + Snapshot snapshot = table.latestSnapshot().get(); + + LakeSource lakeSource = lakeStorage.createLakeSource(tablePath); + List paimonSplits = lakeSource.createPlanner(snapshot::id).plan(); + + // The DATE partition must be rendered in Fluss partition-name format ("2024-03-01"), + // not read blindly via BinaryRow.getString which yields garbage for non-String columns. + PaimonSplit paimonSplit = paimonSplits.get(0); + assertThat(paimonSplit.partition()).isEqualTo(Collections.singletonList("2024-03-01")); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java new file mode 100644 index 0000000000..fd5866d18f --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.utils.PartitionUtils; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.junit.jupiter.api.Test; + +import java.time.LocalDate; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PaimonPartitionUtils}. */ +class PaimonPartitionUtilsTest { + + /** Each type's lake-side string must equal the Fluss-side name ({@code convertValueOfType}). */ + @Test + void testTypesMatchFlussName() { + int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay(); + int milliOfDay = ((12 * 60 + 34) * 60 + 56) * 1000 + 789; + long ms = 1709294096123L; + int nanos = 456000; // multiple of 1000 to survive microsecond precision + byte[] bytes = {1, 2, 3, (byte) 0xAB}; + + assertMatches( + DataTypes.BOOLEAN(), w -> w.writeBoolean(0, true), true, DataTypeRoot.BOOLEAN); + assertMatches( + DataTypes.TINYINT(), w -> w.writeByte(0, (byte) 7), (byte) 7, DataTypeRoot.TINYINT); + assertMatches( + DataTypes.SMALLINT(), + w -> w.writeShort(0, (short) 300), + (short) 300, + DataTypeRoot.SMALLINT); + assertMatches(DataTypes.INT(), w -> w.writeInt(0, 42), 42, DataTypeRoot.INTEGER); + assertMatches(DataTypes.BIGINT(), w -> w.writeLong(0, 123L), 123L, DataTypeRoot.BIGINT); + assertMatches(DataTypes.FLOAT(), w -> w.writeFloat(0, 1.5f), 1.5f, DataTypeRoot.FLOAT); + assertMatches(DataTypes.DOUBLE(), w -> w.writeDouble(0, 2.5d), 2.5d, DataTypeRoot.DOUBLE); + assertMatches(DataTypes.DATE(), w -> w.writeInt(0, epochDay), epochDay, DataTypeRoot.DATE); + assertMatches( + DataTypes.TIME(), + w -> w.writeInt(0, milliOfDay), + milliOfDay, + DataTypeRoot.TIME_WITHOUT_TIME_ZONE); + assertMatches(DataTypes.BYTES(), w -> w.writeBinary(0, bytes), bytes, DataTypeRoot.BYTES); + assertMatches( + DataTypes.TIMESTAMP(6), + w -> w.writeTimestamp(0, Timestamp.fromEpochMillis(ms, nanos), 6), + TimestampNtz.fromMillis(ms, nanos), + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); + assertMatches( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6), + w -> w.writeTimestamp(0, Timestamp.fromEpochMillis(ms, nanos), 6), + TimestampLtz.fromEpochMillis(ms, nanos), + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + } + + @Test + void testStringPartition() { + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeString(0, BinaryString.fromString("a")); + writer.complete(); + + assertThat(PaimonPartitionUtils.partitionValues(partition, RowType.of(DataTypes.STRING()))) + .containsExactly("a"); + } + + @Test + void testMultiColumnPartition() { + int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay(); + BinaryRow partition = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeInt(0, epochDay); + writer.writeInt(1, 42); + writer.complete(); + + assertThat( + PaimonPartitionUtils.partitionValues( + partition, RowType.of(DataTypes.DATE(), DataTypes.INT()))) + .containsExactly("2024-03-01", "42"); + } + + private static void assertMatches( + DataType paimonType, + Consumer writeValue, + Object flussValue, + DataTypeRoot flussRoot) { + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writeValue.accept(writer); + writer.complete(); + assertThat(PaimonPartitionUtils.partitionValues(partition, RowType.of(paimonType))) + .as("type %s", flussRoot) + .containsExactly(PartitionUtils.convertValueOfType(flussValue, flussRoot)); + } +}