From 5b29bb68ff60445bb82a664b1d12f5d27b3083ef Mon Sep 17 00:00:00 2001 From: bryndenZh Date: Sat, 20 Jun 2026 00:00:47 +0800 Subject: [PATCH 1/2] [lake] Fix union read duplicating rows for non-String partition columns For a partitioned table with table.datalake.enabled=true and a non-String partition column (DATE, INT, TIMESTAMP, etc.), default union read returned each tiered row twice because PaimonSplit#partition() read every partition field via BinaryRow.getString regardless of logical type. The lake-side partition name then never matched the Fluss-side name in LakeSplitGenerator, so the same partition was emitted as both a lake-only split and a Fluss-log split. Add PaimonPartitionUtils to render partition values in a logical-type-aware way matching PartitionUtils#convertValueOfType, compute them in PaimonSplitPlanner, and apply the same fix to DvTableReadableSnapshotRetriever. Co-Authored-By: Claude Opus 4.8 --- .../fluss/lake/paimon/source/PaimonSplit.java | 23 +--- .../paimon/source/PaimonSplitPlanner.java | 10 +- .../paimon/source/PaimonSplitSerializer.java | 14 +- .../DvTableReadableSnapshotRetriever.java | 13 +- .../paimon/utils/PaimonPartitionUtils.java | 98 ++++++++++++++ .../source/PaimonSplitSerializerTest.java | 1 + .../lake/paimon/source/PaimonSplitTest.java | 30 +++++ .../utils/PaimonPartitionUtilsTest.java | 123 ++++++++++++++++++ 8 files changed, 284 insertions(+), 28 deletions(-) create mode 100644 fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java create mode 100644 fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java index 925377067e..ed47361c3c 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java @@ -20,11 +20,8 @@ import org.apache.fluss.lake.source.LakeSplit; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.table.source.DataSplit; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; /** Split for paimon table. */ @@ -36,9 +33,13 @@ public class PaimonSplit implements LakeSplit { private final boolean isBucketUnAware; - public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware) { + // Partition values in Fluss partition-name format + private final List partition; + + public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware, List partition) { this.dataSplit = dataSplit; this.isBucketUnAware = isBucketUnAware; + this.partition = partition; } @Override @@ -52,19 +53,7 @@ public int bucket() { @Override public List partition() { - BinaryRow partition = dataSplit.partition(); - if (partition.getFieldCount() == 0) { - return Collections.emptyList(); - } - - List partitions = new ArrayList<>(); - for (int i = 0; i < partition.getFieldCount(); i++) { - // Todo Currently, partition column must be String datatype, so we can always use - // consider it as string. Revisit here when - // #489 is finished. - partitions.add(partition.getString(i).toString()); - } - return partitions; + return partition; } public DataSplit dataSplit() { diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java index db723f581f..76139e6e80 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java @@ -19,6 +19,7 @@ package org.apache.fluss.lake.paimon.source; import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.paimon.utils.PaimonPartitionUtils; import org.apache.fluss.lake.source.Planner; import org.apache.fluss.metadata.TablePath; @@ -33,6 +34,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.RowType; import javax.annotation.Nullable; @@ -69,13 +71,19 @@ public List plan() { FileStoreTable fileStoreTable = getTable(catalog, tablePath, snapshotId); InnerTableScan tableScan = fileStoreTable.newScan(); boolean isBucketUnAware = fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE; + RowType partitionType = fileStoreTable.schema().logicalPartitionType(); if (predicate != null) { tableScan = tableScan.withFilter(predicate); } for (Split split : tableScan.plan().splits()) { DataSplit dataSplit = (DataSplit) split; - splits.add(new PaimonSplit(dataSplit, isBucketUnAware)); + List partition = + dataSplit.partition().getFieldCount() == 0 + ? Collections.emptyList() + : PaimonPartitionUtils.partitionValues( + dataSplit.partition(), partitionType); + splits.add(new PaimonSplit(dataSplit, isBucketUnAware, partition)); } } return splits; diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java index c5e4d38d1d..4ba731e5de 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java @@ -28,6 +28,8 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** Serializer for paimon split. */ public class PaimonSplitSerializer implements SimpleVersionedSerializer { @@ -46,6 +48,11 @@ public byte[] serialize(PaimonSplit paimonSplit) throws IOException { DataSplit dataSplit = paimonSplit.dataSplit(); InstantiationUtil.serializeObject(view, dataSplit); view.writeBoolean(paimonSplit.isBucketUnAware()); + List partition = paimonSplit.partition(); + view.writeInt(partition.size()); + for (String value : partition) { + view.writeUTF(value); + } return out.toByteArray(); } @@ -59,7 +66,12 @@ public PaimonSplit deserialize(int version, byte[] serialized) throws IOExceptio if (version == VERSION_1) { DataInputStream dis = new DataInputStream(in); boolean isBucketUnAware = dis.readBoolean(); - return new PaimonSplit(dataSplit, isBucketUnAware); + int size = dis.readInt(); + List partition = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + partition.add(dis.readUTF()); + } + return new PaimonSplit(dataSplit, isBucketUnAware, partition); } else { throw new IOException("Unsupported PaimonSplit serialization version: " + version); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java index 54c3c7f948..9d9485155d 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java @@ -35,7 +35,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.BinaryString; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.FileStoreScan; @@ -48,7 +47,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -601,13 +599,10 @@ private Map getPartitionNameToIdMapping() throws IOException { * @return partition name string */ private String getPartitionNameFromBinaryRow(BinaryRow partition) { - List partitionValues = new ArrayList<>(); - for (int i = 0; i < partition.getFieldCount(); i++) { - // todo: consider other partition type - BinaryString binaryString = partition.getString(i); - partitionValues.add(binaryString.toString()); - } - return String.join(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, partitionValues); + return String.join( + ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, + PaimonPartitionUtils.partitionValues( + partition, fileStoreTable.schema().logicalPartitionType())); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java new file mode 100644 index 0000000000..967bee34bc --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.utils.PartitionNameConverters; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeChecks; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.List; + +/** Extracts partition values from a Paimon partition row as Fluss partition-name strings. */ +public class PaimonPartitionUtils { + + private PaimonPartitionUtils() {} + + /** + * Converts a Paimon partition row into Fluss partition value strings, in partition-key order. + * The output must match {@code PartitionUtils#convertValueOfType} (the Fluss-side format), + * otherwise lake-side and Fluss-side partition names won't match during union read split + * generation. + */ + public static List partitionValues(BinaryRow partition, RowType partitionType) { + List values = new ArrayList<>(partition.getFieldCount()); + for (int i = 0; i < partition.getFieldCount(); i++) { + values.add(toFlussPartitionString(partition, i, partitionType.getTypeAt(i))); + } + return values; + } + + private static String toFlussPartitionString(BinaryRow partition, int pos, DataType type) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return partition.getString(pos).toString(); + case BOOLEAN: + return Boolean.toString(partition.getBoolean(pos)); + case TINYINT: + return Byte.toString(partition.getByte(pos)); + case SMALLINT: + return Short.toString(partition.getShort(pos)); + case INTEGER: + return Integer.toString(partition.getInt(pos)); + case BIGINT: + return Long.toString(partition.getLong(pos)); + case FLOAT: + return PartitionNameConverters.reformatFloat(partition.getFloat(pos)); + case DOUBLE: + return PartitionNameConverters.reformatDouble(partition.getDouble(pos)); + case DATE: + return PartitionNameConverters.dayToString(partition.getInt(pos)); + case TIME_WITHOUT_TIME_ZONE: + return PartitionNameConverters.milliToString(partition.getInt(pos)); + case BINARY: + case VARBINARY: + return PartitionNameConverters.hexString(partition.getBinary(pos)); + case TIMESTAMP_WITHOUT_TIME_ZONE: + { + Timestamp ts = partition.getTimestamp(pos, DataTypeChecks.getPrecision(type)); + return PartitionNameConverters.timestampToString( + TimestampNtz.fromMillis( + ts.getMillisecond(), ts.getNanoOfMillisecond())); + } + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + { + Timestamp ts = partition.getTimestamp(pos, DataTypeChecks.getPrecision(type)); + return PartitionNameConverters.timestampToString( + TimestampLtz.fromEpochMillis( + ts.getMillisecond(), ts.getNanoOfMillisecond())); + } + default: + throw new IllegalArgumentException( + "Unsupported partition column type: " + type.getTypeRoot()); + } + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java index 3f72cea558..dc75f34e44 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java @@ -77,6 +77,7 @@ void testSerializeAndDeserialize() throws Exception { assertThat(deserialized.dataSplit()).isEqualTo(originalPaimonSplit.dataSplit()); assertThat(deserialized.isBucketUnAware()).isEqualTo(originalPaimonSplit.isBucketUnAware()); + assertThat(deserialized.partition()).isEqualTo(originalPaimonSplit.partition()); } @Test diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java index a81800374d..9d5b5ee5de 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java @@ -35,6 +35,7 @@ import org.apache.paimon.types.DataTypes; import org.junit.jupiter.api.Test; +import java.time.LocalDate; import java.util.Collections; import java.util.List; @@ -77,4 +78,33 @@ void testPaimonSplit() throws Exception { assertThat(actualSplit).isEqualTo(paimonSplit.dataSplit()); assertThat(((DataSplit) actualSplit).bucket()).isEqualTo(paimonSplit.bucket()); } + + @Test + void testPaimonSplitWithDatePartition() throws Exception { + int bucketNum = 1; + TablePath tablePath = TablePath.of(DEFAULT_DB, "non_string_partition_table"); + Schema.Builder builder = + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("dt", DataTypes.DATE()); + builder.partitionKeys("dt"); + builder.primaryKey("c1", "dt"); + builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum)); + createTable(tablePath, builder.build()); + Table table = getTable(tablePath); + + int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay(); + GenericRow record1 = GenericRow.of(12, BinaryString.fromString("a"), epochDay); + writeRecord(tablePath, Collections.singletonList(record1)); + Snapshot snapshot = table.latestSnapshot().get(); + + LakeSource lakeSource = lakeStorage.createLakeSource(tablePath); + List paimonSplits = lakeSource.createPlanner(snapshot::id).plan(); + + // The DATE partition must be rendered in Fluss partition-name format ("2024-03-01"), + // not read blindly via BinaryRow.getString which yields garbage for non-String columns. + PaimonSplit paimonSplit = paimonSplits.get(0); + assertThat(paimonSplit.partition()).isEqualTo(Collections.singletonList("2024-03-01")); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java new file mode 100644 index 0000000000..fd5866d18f --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.utils.PartitionUtils; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.junit.jupiter.api.Test; + +import java.time.LocalDate; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PaimonPartitionUtils}. */ +class PaimonPartitionUtilsTest { + + /** Each type's lake-side string must equal the Fluss-side name ({@code convertValueOfType}). */ + @Test + void testTypesMatchFlussName() { + int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay(); + int milliOfDay = ((12 * 60 + 34) * 60 + 56) * 1000 + 789; + long ms = 1709294096123L; + int nanos = 456000; // multiple of 1000 to survive microsecond precision + byte[] bytes = {1, 2, 3, (byte) 0xAB}; + + assertMatches( + DataTypes.BOOLEAN(), w -> w.writeBoolean(0, true), true, DataTypeRoot.BOOLEAN); + assertMatches( + DataTypes.TINYINT(), w -> w.writeByte(0, (byte) 7), (byte) 7, DataTypeRoot.TINYINT); + assertMatches( + DataTypes.SMALLINT(), + w -> w.writeShort(0, (short) 300), + (short) 300, + DataTypeRoot.SMALLINT); + assertMatches(DataTypes.INT(), w -> w.writeInt(0, 42), 42, DataTypeRoot.INTEGER); + assertMatches(DataTypes.BIGINT(), w -> w.writeLong(0, 123L), 123L, DataTypeRoot.BIGINT); + assertMatches(DataTypes.FLOAT(), w -> w.writeFloat(0, 1.5f), 1.5f, DataTypeRoot.FLOAT); + assertMatches(DataTypes.DOUBLE(), w -> w.writeDouble(0, 2.5d), 2.5d, DataTypeRoot.DOUBLE); + assertMatches(DataTypes.DATE(), w -> w.writeInt(0, epochDay), epochDay, DataTypeRoot.DATE); + assertMatches( + DataTypes.TIME(), + w -> w.writeInt(0, milliOfDay), + milliOfDay, + DataTypeRoot.TIME_WITHOUT_TIME_ZONE); + assertMatches(DataTypes.BYTES(), w -> w.writeBinary(0, bytes), bytes, DataTypeRoot.BYTES); + assertMatches( + DataTypes.TIMESTAMP(6), + w -> w.writeTimestamp(0, Timestamp.fromEpochMillis(ms, nanos), 6), + TimestampNtz.fromMillis(ms, nanos), + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); + assertMatches( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6), + w -> w.writeTimestamp(0, Timestamp.fromEpochMillis(ms, nanos), 6), + TimestampLtz.fromEpochMillis(ms, nanos), + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + } + + @Test + void testStringPartition() { + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeString(0, BinaryString.fromString("a")); + writer.complete(); + + assertThat(PaimonPartitionUtils.partitionValues(partition, RowType.of(DataTypes.STRING()))) + .containsExactly("a"); + } + + @Test + void testMultiColumnPartition() { + int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay(); + BinaryRow partition = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeInt(0, epochDay); + writer.writeInt(1, 42); + writer.complete(); + + assertThat( + PaimonPartitionUtils.partitionValues( + partition, RowType.of(DataTypes.DATE(), DataTypes.INT()))) + .containsExactly("2024-03-01", "42"); + } + + private static void assertMatches( + DataType paimonType, + Consumer writeValue, + Object flussValue, + DataTypeRoot flussRoot) { + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writeValue.accept(writer); + writer.complete(); + assertThat(PaimonPartitionUtils.partitionValues(partition, RowType.of(paimonType))) + .as("type %s", flussRoot) + .containsExactly(PartitionUtils.convertValueOfType(flussValue, flussRoot)); + } +} From f6955b6c596fec0f8e5ea418a5d92464eedabd78 Mon Sep 17 00:00:00 2001 From: bryndenZh Date: Wed, 1 Jul 2026 00:42:57 +0800 Subject: [PATCH 2/2] [lake] Address Paimon partition review comments --- .../paimon/source/PaimonSplitPlanner.java | 15 +- .../paimon/source/PaimonSplitSerializer.java | 15 +- .../DvTableReadableSnapshotRetriever.java | 6 +- .../lake/paimon/utils/PaimonConversions.java | 27 +++ .../utils/PaimonDataTypeToFlussDataType.java | 175 ++++++++++++++++++ .../paimon/utils/PaimonPartitionUtils.java | 98 ---------- ...lsTest.java => PaimonConversionsTest.java} | 44 +++-- 7 files changed, 259 insertions(+), 121 deletions(-) create mode 100644 fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonDataTypeToFlussDataType.java delete mode 100644 fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java rename fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/{PaimonPartitionUtilsTest.java => PaimonConversionsTest.java} (72%) diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java index 76139e6e80..e20e3fe8b3 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java @@ -19,14 +19,15 @@ package org.apache.fluss.lake.paimon.source; import org.apache.fluss.config.Configuration; -import org.apache.fluss.lake.paimon.utils.PaimonPartitionUtils; import org.apache.fluss.lake.source.Planner; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.types.RowType; import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.BucketMode; @@ -34,7 +35,6 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.Split; -import org.apache.paimon.types.RowType; import javax.annotation.Nullable; @@ -42,6 +42,8 @@ import java.util.Collections; import java.util.List; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toFlussPartitionValues; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toFlussRowType; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; /** Split panner for paimon table. */ @@ -71,18 +73,19 @@ public List plan() { FileStoreTable fileStoreTable = getTable(catalog, tablePath, snapshotId); InnerTableScan tableScan = fileStoreTable.newScan(); boolean isBucketUnAware = fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE; - RowType partitionType = fileStoreTable.schema().logicalPartitionType(); + RowType flussPartitionType = + toFlussRowType(fileStoreTable.schema().logicalPartitionType()); if (predicate != null) { tableScan = tableScan.withFilter(predicate); } for (Split split : tableScan.plan().splits()) { DataSplit dataSplit = (DataSplit) split; + BinaryRow partitionRow = dataSplit.partition(); List partition = - dataSplit.partition().getFieldCount() == 0 + partitionRow.getFieldCount() == 0 ? Collections.emptyList() - : PaimonPartitionUtils.partitionValues( - dataSplit.partition(), partitionType); + : toFlussPartitionValues(partitionRow, flussPartitionType); splits.add(new PaimonSplit(dataSplit, isBucketUnAware, partition)); } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java index 4ba731e5de..f761efd50e 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java @@ -29,16 +29,19 @@ import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** Serializer for paimon split. */ public class PaimonSplitSerializer implements SimpleVersionedSerializer { private static final int VERSION_1 = 1; + // VERSION_2 additionally persists the partition values. + private static final int VERSION_2 = 2; @Override public int getVersion() { - return VERSION_1; + return VERSION_2; } @Override @@ -62,10 +65,14 @@ public PaimonSplit deserialize(int version, byte[] serialized) throws IOExceptio DataSplit dataSplit; try { dataSplit = InstantiationUtil.deserializeObject(in, getClass().getClassLoader()); - + DataInputStream dis = new DataInputStream(in); + boolean isBucketUnAware = dis.readBoolean(); if (version == VERSION_1) { - DataInputStream dis = new DataInputStream(in); - boolean isBucketUnAware = dis.readBoolean(); + // VERSION_1 payloads do not contain partition values. Keep them readable, but + // don't try to reconstruct partition names from DataSplit because non-string + // partition columns require type information that is not present in the payload. + return new PaimonSplit(dataSplit, isBucketUnAware, Collections.emptyList()); + } else if (version == VERSION_2) { int size = dis.readInt(); List partition = new ArrayList<>(size); for (int i = 0; i < size; i++) { diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java index 9d9485155d..0d55edc484 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java @@ -601,8 +601,10 @@ private Map getPartitionNameToIdMapping() throws IOException { private String getPartitionNameFromBinaryRow(BinaryRow partition) { return String.join( ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, - PaimonPartitionUtils.partitionValues( - partition, fileStoreTable.schema().logicalPartitionType())); + PaimonConversions.toFlussPartitionValues( + partition, + PaimonConversions.toFlussRowType( + fileStoreTable.schema().logicalPartitionType()))); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index 94166e2d05..4bb12a0f42 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -28,12 +28,15 @@ import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.utils.PartitionUtils; import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -116,6 +119,30 @@ public static Object toPaimonLiteral(DataType dataType, Object flussLiteral) { .getFieldOrNull(flussRowAsPaimonRow); } + public static org.apache.fluss.types.RowType toFlussRowType(RowType paimonRowType) { + org.apache.fluss.types.RowType.Builder builder = org.apache.fluss.types.RowType.builder(); + for (DataField field : paimonRowType.getFields()) { + builder.field( + field.name(), field.type().accept(PaimonDataTypeToFlussDataType.INSTANCE)); + } + return builder.build(); + } + + /** + * Renders a Paimon partition row into Fluss partition value strings, in partition-key order. + */ + public static List toFlussPartitionValues( + BinaryRow partition, org.apache.fluss.types.RowType flussPartitionType) { + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow().replaceRow(partition); + List values = new ArrayList<>(partition.getFieldCount()); + for (int i = 0; i < partition.getFieldCount(); i++) { + org.apache.fluss.types.DataType flussType = flussPartitionType.getTypeAt(i); + Object value = InternalRow.createFieldGetter(flussType, i).getFieldOrNull(flussRow); + values.add(PartitionUtils.convertValueOfType(value, flussType.getTypeRoot())); + } + return values; + } + public static List toPaimonSchemaChanges(List tableChanges) { List schemaChanges = new ArrayList<>(tableChanges.size()); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonDataTypeToFlussDataType.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonDataTypeToFlussDataType.java new file mode 100644 index 0000000000..36f4678f1b --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonDataTypeToFlussDataType.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.types.DataTypes; + +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeDefaultVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; + +/** + * Convert from Paimon's data type to Fluss's data type (inverse of {@link + * FlussDataTypeToPaimonDataType}). + */ +public class PaimonDataTypeToFlussDataType + extends DataTypeDefaultVisitor { + + public static final PaimonDataTypeToFlussDataType INSTANCE = + new PaimonDataTypeToFlussDataType(); + + @Override + public org.apache.fluss.types.DataType visit(CharType charType) { + return withNullability(DataTypes.CHAR(charType.getLength()), charType); + } + + @Override + public org.apache.fluss.types.DataType visit(VarCharType varCharType) { + return withNullability(DataTypes.STRING(), varCharType); + } + + @Override + public org.apache.fluss.types.DataType visit(BooleanType booleanType) { + return withNullability(DataTypes.BOOLEAN(), booleanType); + } + + @Override + public org.apache.fluss.types.DataType visit(BinaryType binaryType) { + return withNullability(DataTypes.BINARY(binaryType.getLength()), binaryType); + } + + @Override + public org.apache.fluss.types.DataType visit(VarBinaryType varBinaryType) { + return withNullability(DataTypes.BYTES(), varBinaryType); + } + + @Override + public org.apache.fluss.types.DataType visit(DecimalType decimalType) { + return withNullability( + DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()), decimalType); + } + + @Override + public org.apache.fluss.types.DataType visit(TinyIntType tinyIntType) { + return withNullability(DataTypes.TINYINT(), tinyIntType); + } + + @Override + public org.apache.fluss.types.DataType visit(SmallIntType smallIntType) { + return withNullability(DataTypes.SMALLINT(), smallIntType); + } + + @Override + public org.apache.fluss.types.DataType visit(IntType intType) { + return withNullability(DataTypes.INT(), intType); + } + + @Override + public org.apache.fluss.types.DataType visit(BigIntType bigIntType) { + return withNullability(DataTypes.BIGINT(), bigIntType); + } + + @Override + public org.apache.fluss.types.DataType visit(FloatType floatType) { + return withNullability(DataTypes.FLOAT(), floatType); + } + + @Override + public org.apache.fluss.types.DataType visit(DoubleType doubleType) { + return withNullability(DataTypes.DOUBLE(), doubleType); + } + + @Override + public org.apache.fluss.types.DataType visit(DateType dateType) { + return withNullability(DataTypes.DATE(), dateType); + } + + @Override + public org.apache.fluss.types.DataType visit(TimeType timeType) { + return withNullability(DataTypes.TIME(timeType.getPrecision()), timeType); + } + + @Override + public org.apache.fluss.types.DataType visit(TimestampType timestampType) { + return withNullability(DataTypes.TIMESTAMP(timestampType.getPrecision()), timestampType); + } + + @Override + public org.apache.fluss.types.DataType visit(LocalZonedTimestampType localZonedTimestampType) { + return withNullability( + DataTypes.TIMESTAMP_LTZ(localZonedTimestampType.getPrecision()), + localZonedTimestampType); + } + + @Override + public org.apache.fluss.types.DataType visit(ArrayType arrayType) { + return withNullability(DataTypes.ARRAY(arrayType.getElementType().accept(this)), arrayType); + } + + @Override + public org.apache.fluss.types.DataType visit(MapType mapType) { + return withNullability( + DataTypes.MAP( + mapType.getKeyType().accept(this), mapType.getValueType().accept(this)), + mapType); + } + + @Override + public org.apache.fluss.types.DataType visit(RowType rowType) { + org.apache.fluss.types.RowType.Builder builder = org.apache.fluss.types.RowType.builder(); + for (DataField field : rowType.getFields()) { + org.apache.fluss.types.DataType fieldType = field.type().accept(this); + if (field.description() == null) { + builder.field(field.name(), fieldType); + } else { + builder.field(field.name(), fieldType, field.description()); + } + } + return withNullability(builder.build(), rowType); + } + + @Override + protected org.apache.fluss.types.DataType defaultMethod(DataType dataType) { + throw new UnsupportedOperationException( + "Unsupported data type to convert to Fluss: " + dataType.getTypeRoot()); + } + + private static org.apache.fluss.types.DataType withNullability( + org.apache.fluss.types.DataType flussType, DataType paimonType) { + return flussType.copy(paimonType.isNullable()); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java deleted file mode 100644 index 967bee34bc..0000000000 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.lake.paimon.utils; - -import org.apache.fluss.row.TimestampLtz; -import org.apache.fluss.row.TimestampNtz; -import org.apache.fluss.utils.PartitionNameConverters; - -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.Timestamp; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypeChecks; -import org.apache.paimon.types.RowType; - -import java.util.ArrayList; -import java.util.List; - -/** Extracts partition values from a Paimon partition row as Fluss partition-name strings. */ -public class PaimonPartitionUtils { - - private PaimonPartitionUtils() {} - - /** - * Converts a Paimon partition row into Fluss partition value strings, in partition-key order. - * The output must match {@code PartitionUtils#convertValueOfType} (the Fluss-side format), - * otherwise lake-side and Fluss-side partition names won't match during union read split - * generation. - */ - public static List partitionValues(BinaryRow partition, RowType partitionType) { - List values = new ArrayList<>(partition.getFieldCount()); - for (int i = 0; i < partition.getFieldCount(); i++) { - values.add(toFlussPartitionString(partition, i, partitionType.getTypeAt(i))); - } - return values; - } - - private static String toFlussPartitionString(BinaryRow partition, int pos, DataType type) { - switch (type.getTypeRoot()) { - case CHAR: - case VARCHAR: - return partition.getString(pos).toString(); - case BOOLEAN: - return Boolean.toString(partition.getBoolean(pos)); - case TINYINT: - return Byte.toString(partition.getByte(pos)); - case SMALLINT: - return Short.toString(partition.getShort(pos)); - case INTEGER: - return Integer.toString(partition.getInt(pos)); - case BIGINT: - return Long.toString(partition.getLong(pos)); - case FLOAT: - return PartitionNameConverters.reformatFloat(partition.getFloat(pos)); - case DOUBLE: - return PartitionNameConverters.reformatDouble(partition.getDouble(pos)); - case DATE: - return PartitionNameConverters.dayToString(partition.getInt(pos)); - case TIME_WITHOUT_TIME_ZONE: - return PartitionNameConverters.milliToString(partition.getInt(pos)); - case BINARY: - case VARBINARY: - return PartitionNameConverters.hexString(partition.getBinary(pos)); - case TIMESTAMP_WITHOUT_TIME_ZONE: - { - Timestamp ts = partition.getTimestamp(pos, DataTypeChecks.getPrecision(type)); - return PartitionNameConverters.timestampToString( - TimestampNtz.fromMillis( - ts.getMillisecond(), ts.getNanoOfMillisecond())); - } - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - { - Timestamp ts = partition.getTimestamp(pos, DataTypeChecks.getPrecision(type)); - return PartitionNameConverters.timestampToString( - TimestampLtz.fromEpochMillis( - ts.getMillisecond(), ts.getNanoOfMillisecond())); - } - default: - throw new IllegalArgumentException( - "Unsupported partition column type: " + type.getTypeRoot()); - } - } -} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonConversionsTest.java similarity index 72% rename from fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java rename to fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonConversionsTest.java index fd5866d18f..45188b261c 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonConversionsTest.java @@ -18,8 +18,6 @@ package org.apache.fluss.lake.paimon.utils; -import org.apache.fluss.row.TimestampLtz; -import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.types.DataTypeRoot; import org.apache.fluss.utils.PartitionUtils; @@ -37,8 +35,8 @@ import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link PaimonPartitionUtils}. */ -class PaimonPartitionUtilsTest { +/** Test for partition value conversion in {@link PaimonConversions}. */ +class PaimonConversionsTest { /** Each type's lake-side string must equal the Fluss-side name ({@code convertValueOfType}). */ @Test @@ -72,12 +70,12 @@ void testTypesMatchFlussName() { assertMatches( DataTypes.TIMESTAMP(6), w -> w.writeTimestamp(0, Timestamp.fromEpochMillis(ms, nanos), 6), - TimestampNtz.fromMillis(ms, nanos), + org.apache.fluss.row.TimestampNtz.fromMillis(ms, nanos), DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); assertMatches( DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6), w -> w.writeTimestamp(0, Timestamp.fromEpochMillis(ms, nanos), 6), - TimestampLtz.fromEpochMillis(ms, nanos), + org.apache.fluss.row.TimestampLtz.fromEpochMillis(ms, nanos), DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); } @@ -88,7 +86,7 @@ void testStringPartition() { writer.writeString(0, BinaryString.fromString("a")); writer.complete(); - assertThat(PaimonPartitionUtils.partitionValues(partition, RowType.of(DataTypes.STRING()))) + assertThat(flussPartitionNames(partition, RowType.of(DataTypes.STRING()))) .containsExactly("a"); } @@ -101,12 +99,36 @@ void testMultiColumnPartition() { writer.writeInt(1, 42); writer.complete(); - assertThat( - PaimonPartitionUtils.partitionValues( - partition, RowType.of(DataTypes.DATE(), DataTypes.INT()))) + assertThat(flussPartitionNames(partition, RowType.of(DataTypes.DATE(), DataTypes.INT()))) .containsExactly("2024-03-01", "42"); } + @Test + void testNestedTypeConversion() { + RowType paimon = + RowType.of( + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()), + RowType.of(DataTypes.INT(), DataTypes.STRING())); + org.apache.fluss.types.RowType fluss = PaimonConversions.toFlussRowType(paimon); + + assertThat(fluss.getTypeAt(0).getTypeRoot()).isEqualTo(DataTypeRoot.ARRAY); + assertThat( + ((org.apache.fluss.types.ArrayType) fluss.getTypeAt(0)) + .getElementType() + .getTypeRoot()) + .isEqualTo(DataTypeRoot.INTEGER); + assertThat(fluss.getTypeAt(1).getTypeRoot()).isEqualTo(DataTypeRoot.MAP); + assertThat(fluss.getTypeAt(2).getTypeRoot()).isEqualTo(DataTypeRoot.ROW); + } + + /** Test helper: takes a Paimon partition type and runs the full Paimon -> Fluss conversion. */ + private static java.util.List flussPartitionNames( + BinaryRow partition, RowType paimonPartitionType) { + return PaimonConversions.toFlussPartitionValues( + partition, PaimonConversions.toFlussRowType(paimonPartitionType)); + } + private static void assertMatches( DataType paimonType, Consumer writeValue, @@ -116,7 +138,7 @@ private static void assertMatches( BinaryRowWriter writer = new BinaryRowWriter(partition); writeValue.accept(writer); writer.complete(); - assertThat(PaimonPartitionUtils.partitionValues(partition, RowType.of(paimonType))) + assertThat(flussPartitionNames(partition, RowType.of(paimonType))) .as("type %s", flussRoot) .containsExactly(PartitionUtils.convertValueOfType(flussValue, flussRoot)); }