Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@

import org.apache.fluss.lake.source.LakeSplit;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.table.source.DataSplit;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/** Split for paimon table. */
Expand All @@ -36,9 +33,13 @@ public class PaimonSplit implements LakeSplit {

private final boolean isBucketUnAware;

public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware) {
// Partition values in Fluss partition-name format
private final List<String> partition;

public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware, List<String> partition) {
this.dataSplit = dataSplit;
this.isBucketUnAware = isBucketUnAware;
this.partition = partition;
}

@Override
Expand All @@ -52,19 +53,7 @@ public int bucket() {

@Override
public List<String> partition() {
BinaryRow partition = dataSplit.partition();
if (partition.getFieldCount() == 0) {
return Collections.emptyList();
}

List<String> partitions = new ArrayList<>();
for (int i = 0; i < partition.getFieldCount(); i++) {
// Todo Currently, partition column must be String datatype, so we can always use
// consider it as string. Revisit here when
// #489 is finished.
partitions.add(partition.getString(i).toString());
}
return partitions;
return partition;
}
Comment on lines 55 to 57

public DataSplit dataSplit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.fluss.lake.paimon.source;

import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.paimon.utils.PaimonPartitionUtils;
import org.apache.fluss.lake.source.Planner;
import org.apache.fluss.metadata.TablePath;

Expand All @@ -33,6 +34,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -69,13 +71,19 @@ public List<PaimonSplit> plan() {
FileStoreTable fileStoreTable = getTable(catalog, tablePath, snapshotId);
InnerTableScan tableScan = fileStoreTable.newScan();
boolean isBucketUnAware = fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE;
RowType partitionType = fileStoreTable.schema().logicalPartitionType();

if (predicate != null) {
tableScan = tableScan.withFilter(predicate);
}
for (Split split : tableScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
splits.add(new PaimonSplit(dataSplit, isBucketUnAware));
List<String> partition =
dataSplit.partition().getFieldCount() == 0
? Collections.emptyList()
: PaimonPartitionUtils.partitionValues(
dataSplit.partition(), partitionType);
splits.add(new PaimonSplit(dataSplit, isBucketUnAware, partition));
}
}
return splits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/** Serializer for paimon split. */
public class PaimonSplitSerializer implements SimpleVersionedSerializer<PaimonSplit> {
Expand All @@ -46,6 +48,11 @@ public byte[] serialize(PaimonSplit paimonSplit) throws IOException {
DataSplit dataSplit = paimonSplit.dataSplit();
InstantiationUtil.serializeObject(view, dataSplit);
view.writeBoolean(paimonSplit.isBucketUnAware());
List<String> partition = paimonSplit.partition();
view.writeInt(partition.size());
for (String value : partition) {
view.writeUTF(value);
}
Comment on lines 48 to +55
return out.toByteArray();
}

Expand All @@ -59,7 +66,12 @@ public PaimonSplit deserialize(int version, byte[] serialized) throws IOExceptio
if (version == VERSION_1) {
DataInputStream dis = new DataInputStream(in);
boolean isBucketUnAware = dis.readBoolean();
return new PaimonSplit(dataSplit, isBucketUnAware);
int size = dis.readInt();
List<String> partition = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
partition.add(dis.readUTF());
}
return new PaimonSplit(dataSplit, isBucketUnAware, partition);
} else {
throw new IOException("Unsupported PaimonSplit serialization version: " + version);
}
Comment on lines 66 to 77
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
Expand All @@ -48,7 +47,6 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -601,13 +599,10 @@ private Map<String, Long> getPartitionNameToIdMapping() throws IOException {
* @return partition name string
*/
private String getPartitionNameFromBinaryRow(BinaryRow partition) {
List<String> partitionValues = new ArrayList<>();
for (int i = 0; i < partition.getFieldCount(); i++) {
// todo: consider other partition type
BinaryString binaryString = partition.getString(i);
partitionValues.add(binaryString.toString());
}
return String.join(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, partitionValues);
return String.join(
ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR,
PaimonPartitionUtils.partitionValues(
partition, fileStoreTable.schema().logicalPartitionType()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.paimon.utils;

import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.utils.PartitionNameConverters;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.RowType;

import java.util.ArrayList;
import java.util.List;

/** Extracts partition values from a Paimon partition row as Fluss partition-name strings. */
public class PaimonPartitionUtils {

private PaimonPartitionUtils() {}

/**
* Converts a Paimon partition row into Fluss partition value strings, in partition-key order.
* The output must match {@code PartitionUtils#convertValueOfType} (the Fluss-side format),
* otherwise lake-side and Fluss-side partition names won't match during union read split
* generation.
*/
public static List<String> partitionValues(BinaryRow partition, RowType partitionType) {
List<String> values = new ArrayList<>(partition.getFieldCount());
for (int i = 0; i < partition.getFieldCount(); i++) {
values.add(toFlussPartitionString(partition, i, partitionType.getTypeAt(i)));
}
return values;
}

private static String toFlussPartitionString(BinaryRow partition, int pos, DataType type) {
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
return partition.getString(pos).toString();
case BOOLEAN:
return Boolean.toString(partition.getBoolean(pos));
case TINYINT:
return Byte.toString(partition.getByte(pos));
case SMALLINT:
return Short.toString(partition.getShort(pos));
case INTEGER:
return Integer.toString(partition.getInt(pos));
case BIGINT:
return Long.toString(partition.getLong(pos));
case FLOAT:
return PartitionNameConverters.reformatFloat(partition.getFloat(pos));
case DOUBLE:
return PartitionNameConverters.reformatDouble(partition.getDouble(pos));
case DATE:
return PartitionNameConverters.dayToString(partition.getInt(pos));
case TIME_WITHOUT_TIME_ZONE:
return PartitionNameConverters.milliToString(partition.getInt(pos));
case BINARY:
case VARBINARY:
return PartitionNameConverters.hexString(partition.getBinary(pos));
case TIMESTAMP_WITHOUT_TIME_ZONE:
{
Timestamp ts = partition.getTimestamp(pos, DataTypeChecks.getPrecision(type));
return PartitionNameConverters.timestampToString(
TimestampNtz.fromMillis(
ts.getMillisecond(), ts.getNanoOfMillisecond()));
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
{
Timestamp ts = partition.getTimestamp(pos, DataTypeChecks.getPrecision(type));
return PartitionNameConverters.timestampToString(
TimestampLtz.fromEpochMillis(
ts.getMillisecond(), ts.getNanoOfMillisecond()));
}
default:
throw new IllegalArgumentException(
"Unsupported partition column type: " + type.getTypeRoot());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ void testSerializeAndDeserialize() throws Exception {

assertThat(deserialized.dataSplit()).isEqualTo(originalPaimonSplit.dataSplit());
assertThat(deserialized.isBucketUnAware()).isEqualTo(originalPaimonSplit.isBucketUnAware());
assertThat(deserialized.partition()).isEqualTo(originalPaimonSplit.partition());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.types.DataTypes;
import org.junit.jupiter.api.Test;

import java.time.LocalDate;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -77,4 +78,33 @@ void testPaimonSplit() throws Exception {
assertThat(actualSplit).isEqualTo(paimonSplit.dataSplit());
assertThat(((DataSplit) actualSplit).bucket()).isEqualTo(paimonSplit.bucket());
}

@Test
void testPaimonSplitWithDatePartition() throws Exception {
int bucketNum = 1;
TablePath tablePath = TablePath.of(DEFAULT_DB, "non_string_partition_table");
Schema.Builder builder =
Schema.newBuilder()
.column("c1", DataTypes.INT())
.column("c2", DataTypes.STRING())
.column("dt", DataTypes.DATE());
builder.partitionKeys("dt");
builder.primaryKey("c1", "dt");
builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
createTable(tablePath, builder.build());
Table table = getTable(tablePath);

int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay();
GenericRow record1 = GenericRow.of(12, BinaryString.fromString("a"), epochDay);
writeRecord(tablePath, Collections.singletonList(record1));
Snapshot snapshot = table.latestSnapshot().get();

LakeSource<PaimonSplit> lakeSource = lakeStorage.createLakeSource(tablePath);
List<PaimonSplit> paimonSplits = lakeSource.createPlanner(snapshot::id).plan();

// The DATE partition must be rendered in Fluss partition-name format ("2024-03-01"),
// not read blindly via BinaryRow.getString which yields garbage for non-String columns.
PaimonSplit paimonSplit = paimonSplits.get(0);
assertThat(paimonSplit.partition()).isEqualTo(Collections.singletonList("2024-03-01"));
}
}
Loading