From 23db9fc196b4a91171f6cb0855ce4ee94bccc0af Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Fri, 26 Jun 2026 17:10:13 +0800 Subject: [PATCH 1/4] [core] Support reading shared-shredding map --- .../shredding/MapSharedShreddingReader.java | 355 ++++++++++++++++++ .../shredding/MapSharedShreddingUtils.java | 39 ++ .../MapSharedShreddingReaderTest.java | 272 ++++++++++++++ .../MapSharedShreddingUtilsTest.java | 98 +++++ 4 files changed, 764 insertions(+) create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java new file mode 100644 index 000000000000..1c58d660c16f --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * A reader wrapper that rebuilds logical MAP values from shared-shredding physical ROW values. + * + *

The wrapped format reader reads the physical schema stored in a file. This reader presents the + * original logical schema to upper layers by lazily converting only shared-shredding MAP fields + * when {@link InternalRow#getMap(int)} is called. + */ +public class MapSharedShreddingReader implements FileRecordReader { + + private final FileRecordReader reader; + private final RowType logicalType; + private final Map contextByFieldIndex; + + public MapSharedShreddingReader( + FileRecordReader reader, + RowType logicalType, + Map fieldMetas) { + this.reader = reader; + this.logicalType = logicalType; + this.contextByFieldIndex = createContexts(logicalType, fieldMetas); + } + + public static Map readSharedShreddingMetas( + Map> fieldMetadata) { + Map metas = new LinkedHashMap<>(); + for (Map.Entry> entry : fieldMetadata.entrySet()) { + if (MapSharedShreddingUtils.hasShreddingMetadata(entry.getValue())) { + metas.put( + entry.getKey(), + MapSharedShreddingUtils.deserializeMetadata( + entry.getValue(), + MapSharedShreddingDefine.DEFAULT_DICT_COMPRESSION)); + } + } + return metas; + } + + private static Map createContexts( + RowType logicalType, Map fieldMetas) { + Map contexts = new LinkedHashMap<>(); + for (int i = 0; i < logicalType.getFieldCount(); i++) { + DataField field = logicalType.getFields().get(i); + MapSharedShreddingFieldMeta fieldMeta = fieldMetas.get(field.name()); + if (fieldMeta != null) { + contexts.put(i, new SharedShreddingContext(fieldMeta, field.type())); + } + } + return contexts; + } + + @Nullable + @Override + public FileRecordIterator readBatch() throws IOException { + FileRecordIterator iterator = reader.readBatch(); + if (iterator == null) { + return null; + } + return new SharedShreddingIterator(iterator); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + private class SharedShreddingIterator implements FileRecordIterator { + + private final FileRecordIterator iterator; + + private SharedShreddingIterator(FileRecordIterator iterator) { + this.iterator = iterator; + } + + @Override + public long returnedPosition() { + return iterator.returnedPosition(); + } + + @Override + public Path filePath() { + return iterator.filePath(); + } + + @Nullable + @Override + public InternalRow next() throws IOException { + InternalRow row = iterator.next(); + if (row == null) { + return null; + } + return new SharedShreddingRow(row); + } + + @Override + public void releaseBatch() { + iterator.releaseBatch(); + } + } + + private class SharedShreddingRow implements InternalRow { + + private final InternalRow row; + + private SharedShreddingRow(InternalRow row) { + this.row = row; + } + + @Override + public int getFieldCount() { + return logicalType.getFieldCount(); + } + + @Override + public RowKind getRowKind() { + return row.getRowKind(); + } + + @Override + public void setRowKind(RowKind kind) { + row.setRowKind(kind); + } + + @Override + public boolean isNullAt(int pos) { + return row.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return row.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return row.getByte(pos); + } + + @Override + public short getShort(int pos) { + return row.getShort(pos); + } + + @Override + public int getInt(int pos) { + return row.getInt(pos); + } + + @Override + public long getLong(int pos) { + return row.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return row.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return row.getDouble(pos); + } + + @Override + public BinaryString getString(int pos) { + return row.getString(pos); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + return row.getDecimal(pos, precision, scale); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + return row.getTimestamp(pos, precision); + } + + @Override + public byte[] getBinary(int pos) { + return row.getBinary(pos); + } + + @Override + public Variant getVariant(int pos) { + return row.getVariant(pos); + } + + @Override + public Blob getBlob(int pos) { + return row.getBlob(pos); + } + + @Override + public InternalArray getArray(int pos) { + return row.getArray(pos); + } + + @Override + public InternalVector getVector(int pos) { + return row.getVector(pos); + } + + @Override + public InternalMap getMap(int pos) { + SharedShreddingContext context = contextByFieldIndex.get(pos); + if (context == null) { + return row.getMap(pos); + } + if (row.isNullAt(pos)) { + return null; + } + InternalRow physicalRow = row.getRow(pos, context.numPhysicalFields); + return rebuildLogicalMap(physicalRow, context); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + return row.getRow(pos, numFields); + } + } + + private static InternalMap rebuildLogicalMap( + InternalRow physicalRow, SharedShreddingContext context) { + if (physicalRow.isNullAt(0)) { + throw new IllegalArgumentException( + "Shared-shredding field mapping cannot be null in a non-null physical row."); + } + InternalArray fieldMapping = physicalRow.getArray(0); + Map result = new LinkedHashMap<>(); + int numMappedColumns = Math.min(context.numColumns, fieldMapping.size()); + for (int column = 0; column < numMappedColumns; column++) { + if (fieldMapping.isNullAt(column)) { + throw new IllegalArgumentException( + "Shared-shredding field mapping element cannot be null."); + } + int fieldId = fieldMapping.getInt(column); + if (fieldId < 0) { + continue; + } + BinaryString fieldName = context.nameById.get(fieldId); + if (fieldName == null) { + throw new IllegalArgumentException( + "Cannot find shared-shredding field id " + fieldId + " in metadata."); + } + int valuePosition = column + 1; + if (valuePosition >= physicalRow.getFieldCount()) { + throw new IllegalArgumentException( + "Cannot find shared-shredding physical column " + + MapSharedShreddingDefine.physicalColumnName(column) + + "."); + } + // TODO Support rebuilding in the user requested selected-key order once key-level + // projection is pushed down. Full map reads currently follow the physical/metadata + // layout order. + result.put(fieldName, context.valueGetters[column].getFieldOrNull(physicalRow)); + } + if (context.overflowPosition < physicalRow.getFieldCount() + && !physicalRow.isNullAt(context.overflowPosition)) { + InternalMap overflow = physicalRow.getMap(context.overflowPosition); + InternalArray keys = overflow.keyArray(); + InternalArray values = overflow.valueArray(); + for (int i = 0; i < overflow.size(); i++) { + if (keys.isNullAt(i)) { + throw new IllegalArgumentException( + "Shared-shredding overflow field id cannot be null."); + } + int fieldId = keys.getInt(i); + BinaryString fieldName = context.nameById.get(fieldId); + if (fieldName == null) { + throw new IllegalArgumentException( + "Cannot find shared-shredding field id " + fieldId + " in metadata."); + } + result.put(fieldName, context.overflowValueGetter.getElementOrNull(values, i)); + } + } + return new GenericMap(result); + } + + private static class SharedShreddingContext { + + private final Map nameById; + private final InternalRow.FieldGetter[] valueGetters; + private final InternalArray.ElementGetter overflowValueGetter; + private final int numColumns; + private final int overflowPosition; + private final int numPhysicalFields; + + private SharedShreddingContext(MapSharedShreddingFieldMeta fieldMeta, DataType fieldType) { + MapType mapType = (MapType) fieldType; + this.nameById = new LinkedHashMap<>(); + for (Map.Entry entry : fieldMeta.nameToId().entrySet()) { + // ordered by dict + this.nameById.put(entry.getValue(), BinaryString.fromString(entry.getKey())); + } + this.valueGetters = new InternalRow.FieldGetter[fieldMeta.numColumns()]; + for (int i = 0; i < fieldMeta.numColumns(); i++) { + // plus 1 to skip __field_mapping + this.valueGetters[i] = + InternalRow.createFieldGetter(mapType.getValueType(), i + 1); + } + this.overflowValueGetter = InternalArray.createElementGetter(mapType.getValueType()); + this.numColumns = fieldMeta.numColumns(); + this.overflowPosition = fieldMeta.numColumns() + 1; + this.numPhysicalFields = + fieldMeta.overflowFieldSet().isEmpty() + ? fieldMeta.numColumns() + 1 + : fieldMeta.numColumns() + 2; + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java index 6f0cd879c52a..cc5dab13fdc2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java @@ -111,6 +111,45 @@ public static Map buildColumnToNumColumns( return fieldToNumColumns; } + public static List getPhysicalColumnIndices( + MapSharedShreddingFieldMeta fieldMeta, String fieldName) { + Integer fieldId = fieldMeta.nameToId().get(fieldName); + if (fieldId == null) { + throw new IllegalArgumentException( + "cannot find field " + fieldName + " in map shared shredding meta"); + } + List columns = fieldMeta.fieldToColumns().get(fieldId); + if (columns == null) { + throw new IllegalArgumentException( + "cannot find field id " + + fieldId + + " in field_to_columns in map shared shredding meta"); + } + return columns; + } + + public static boolean isOverflowField(MapSharedShreddingFieldMeta fieldMeta, String fieldName) { + Integer fieldId = fieldMeta.nameToId().get(fieldName); + if (fieldId == null) { + throw new IllegalArgumentException( + "cannot find field " + fieldName + " in map shared shredding meta"); + } + return fieldMeta.overflowFieldSet().contains(fieldId); + } + + public static DataType buildSpecificPhysicalStructType( + DataType valueType, Set physicalColumnIds, boolean includeOverflow) { + RowType.Builder builder = RowType.builder(); + builder.field(MapSharedShreddingDefine.FIELD_MAPPING, new ArrayType(new IntType())); + for (Integer physicalColumnId : new TreeSet<>(physicalColumnIds)) { + builder.field(MapSharedShreddingDefine.physicalColumnName(physicalColumnId), valueType); + } + if (includeOverflow) { + builder.field(MapSharedShreddingDefine.OVERFLOW, new MapType(new IntType(), valueType)); + } + return builder.build(); + } + public static void serializeMetadata( MapSharedShreddingFieldMeta fieldMeta, String compression, diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java new file mode 100644 index 000000000000..84cae7b981ad --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link MapSharedShreddingReader}. */ +class MapSharedShreddingReaderTest { + + @Test + void testRebuildLogicalMap() throws IOException { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, "tags", DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()))); + Map metas = new TreeMap<>(); + metas.put( + "tags", + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + + List physicalRows = + Arrays.asList( + GenericRow.of( + 1, + GenericRow.of( + new GenericArray(new int[] {0, 1}), + 10L, + 20L, + null)), + GenericRow.of( + 2, + GenericRow.of( + new GenericArray(new int[] {0, -1}), + 40L, + null, + intKeyMap(2, null))), + GenericRow.of( + 3, + GenericRow.of( + new GenericArray(new int[] {1, -1}), + null, + null, + null)), + GenericRow.of(4, null)); + + try (MapSharedShreddingReader reader = + new MapSharedShreddingReader( + new InMemoryReader(physicalRows), logicalType, metas)) { + FileRecordIterator batch = reader.readBatch(); + assertThat(batch).isNotNull(); + + InternalRow row = batch.next(); + assertThat(row.getInt(0)).isEqualTo(1); + assertThat(row.getMap(1)).isEqualTo(stringKeyMap("a", 10L, "b", 20L)); + + row = batch.next(); + assertThat(row.getInt(0)).isEqualTo(2); + assertThat(row.getMap(1)).isEqualTo(stringKeyMap("a", 40L, "c", null)); + + row = batch.next(); + assertThat(row.getInt(0)).isEqualTo(3); + assertThat(row.getMap(1)).isEqualTo(stringKeyMap("b", null)); + + row = batch.next(); + assertThat(row.getInt(0)).isEqualTo(4); + assertThat(row.isNullAt(1)).isTrue(); + assertThat(row.getMap(1)).isNull(); + + assertThat(batch.next()).isNull(); + batch.releaseBatch(); + assertThat(reader.readBatch()).isNull(); + } + } + + @Test + void testInvalidNullFieldMapping() throws IOException { + assertInvalidPhysicalTags( + GenericRow.of(null, 10L, 20L, null), + "Shared-shredding field mapping cannot be null"); + } + + @Test + void testInvalidNullFieldMappingElement() throws IOException { + assertInvalidPhysicalTags( + GenericRow.of(new GenericArray(new Object[] {0, null}), 10L, 20L, null), + "Shared-shredding field mapping element cannot be null"); + } + + @Test + void testInvalidUnknownFieldId() throws IOException { + assertInvalidPhysicalTags( + GenericRow.of(new GenericArray(new int[] {0, 99}), 10L, 20L, null), + "Cannot find shared-shredding field id 99 in metadata"); + } + + @Test + void testInvalidUnknownOverflowFieldId() throws IOException { + assertInvalidPhysicalTags( + GenericRow.of( + new GenericArray(new int[] {0, -1}), 10L, null, intKeyMap(99, 20L)), + "Cannot find shared-shredding field id 99 in metadata"); + } + + private static void assertInvalidPhysicalTags(InternalRow physicalTags, String message) + throws IOException { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, "tags", DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()))); + try (MapSharedShreddingReader reader = + new MapSharedShreddingReader( + new InMemoryReader( + Collections.singletonList(GenericRow.of(1, physicalTags))), + logicalType, + metas())) { + FileRecordIterator batch = reader.readBatch(); + InternalRow row = batch.next(); + assertThatThrownBy(() -> row.getMap(1)).hasMessageContaining(message); + batch.releaseBatch(); + } + } + + private static Map metas() { + Map metas = new TreeMap<>(); + metas.put( + "tags", + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + return metas; + } + + private static Map nameToId(Object... kvs) { + Map map = new TreeMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put((String) kvs[i], (Integer) kvs[i + 1]); + } + return map; + } + + private static Map> fieldToColumns( + int fieldId0, List columns0, int fieldId1, List columns1) { + Map> map = new TreeMap<>(); + map.put(fieldId0, columns0); + map.put(fieldId1, columns1); + return map; + } + + private static InternalMap stringKeyMap(Object... kvs) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put(BinaryString.fromString((String) kvs[i]), kvs[i + 1]); + } + return new GenericMap(map); + } + + private static InternalMap intKeyMap(Object... kvs) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put(kvs[i], kvs[i + 1]); + } + return new GenericMap(map); + } + + private static class InMemoryReader implements FileRecordReader { + + private final List rows; + private boolean read; + + private InMemoryReader(List rows) { + this.rows = rows; + } + + @Nullable + @Override + public FileRecordIterator readBatch() { + if (read) { + return null; + } + read = true; + return new InMemoryIterator(rows); + } + + @Override + public void close() {} + } + + private static class InMemoryIterator implements FileRecordIterator { + + private final List rows; + private int index; + + private InMemoryIterator(List rows) { + this.rows = rows; + } + + @Override + public long returnedPosition() { + return index - 1; + } + + @Override + public Path filePath() { + return new Path("memory"); + } + + @Nullable + @Override + public InternalRow next() { + return index < rows.size() ? rows.get(index++) : null; + } + + @Override + public void releaseBatch() {} + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java index 29314c6a00ad..6c139fdf811e 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java @@ -31,7 +31,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -186,6 +188,80 @@ void testLogicalToPhysicalSchemaNoShreddingColumns() { .isEqualTo(logical); } + @Test + void testGetPhysicalColumnIndices() { + MapSharedShreddingFieldMeta fieldMeta = + new MapSharedShreddingFieldMeta( + nameToId("age", 0, "name", 1), + fieldToColumns(0, Arrays.asList(0, 2), 1, Arrays.asList(1)), + new HashSet<>(), + 3, + 2); + + assertThat(MapSharedShreddingUtils.getPhysicalColumnIndices(fieldMeta, "age")) + .containsExactly(0, 2); + assertThat(MapSharedShreddingUtils.getPhysicalColumnIndices(fieldMeta, "name")) + .containsExactly(1); + + assertThatThrownBy( + () -> MapSharedShreddingUtils.getPhysicalColumnIndices(fieldMeta, "score")) + .hasMessageContaining("cannot find field score in map shared shredding meta"); + + MapSharedShreddingFieldMeta missingColumnsMeta = + new MapSharedShreddingFieldMeta( + nameToId("age", 0), new TreeMap<>(), new HashSet<>(), 3, 2); + assertThatThrownBy( + () -> + MapSharedShreddingUtils.getPhysicalColumnIndices( + missingColumnsMeta, "age")) + .hasMessageContaining( + "cannot find field id 0 in field_to_columns in map shared shredding meta"); + } + + @Test + void testIsOverflowField() { + MapSharedShreddingFieldMeta fieldMeta = + new MapSharedShreddingFieldMeta( + nameToId("age", 0, "name", 1), + fieldToColumns(0, Arrays.asList(0), 1, Arrays.asList(1)), + new TreeSet<>(Arrays.asList(1)), + 2, + 2); + + assertThat(MapSharedShreddingUtils.isOverflowField(fieldMeta, "age")).isFalse(); + assertThat(MapSharedShreddingUtils.isOverflowField(fieldMeta, "name")).isTrue(); + assertThatThrownBy(() -> MapSharedShreddingUtils.isOverflowField(fieldMeta, "score")) + .hasMessageContaining("cannot find field score in map shared shredding meta"); + } + + @Test + void testBuildSpecificPhysicalStructType() { + Set physicalColumnIds = new HashSet<>(Arrays.asList(3, 1)); + + RowType physicalType = + (RowType) + MapSharedShreddingUtils.buildSpecificPhysicalStructType( + DataTypes.BIGINT().notNull(), physicalColumnIds, true); + assertThat(physicalType.getFieldNames()) + .containsExactly("__field_mapping", "__col_1", "__col_3", "__overflow"); + assertThat(physicalType.getFields()).extracting(DataField::id).containsExactly(0, 1, 2, 3); + assertThat(physicalType.getField("__field_mapping").type()) + .isEqualTo(DataTypes.ARRAY(DataTypes.INT())); + assertThat(physicalType.getField("__col_1").type()) + .isEqualTo(DataTypes.BIGINT().notNull()); + assertThat(physicalType.getField("__col_3").type()) + .isEqualTo(DataTypes.BIGINT().notNull()); + assertThat(physicalType.getField("__overflow").type()) + .isEqualTo(DataTypes.MAP(DataTypes.INT(), DataTypes.BIGINT().notNull())); + + RowType physicalTypeWithoutOverflow = + (RowType) + MapSharedShreddingUtils.buildSpecificPhysicalStructType( + DataTypes.BIGINT(), physicalColumnIds, false); + assertThat(physicalTypeWithoutOverflow.getFieldNames()) + .containsExactly("__field_mapping", "__col_1", "__col_3"); + } + @Test void testMetadataRoundtrip() { Map nameToId = new TreeMap<>(); @@ -330,4 +406,26 @@ void testPhysicalColumnName() { assertThat(MapSharedShreddingDefine.physicalColumnName(1)).isEqualTo("__col_1"); assertThat(MapSharedShreddingDefine.physicalColumnName(99)).isEqualTo("__col_99"); } + + private static Map nameToId(Object... kvs) { + Map map = new TreeMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put((String) kvs[i], (Integer) kvs[i + 1]); + } + return map; + } + + private static Map> fieldToColumns( + int fieldId, List columns) { + Map> map = new TreeMap<>(); + map.put(fieldId, columns); + return map; + } + + private static Map> fieldToColumns( + int fieldId0, List columns0, int fieldId1, List columns1) { + Map> map = fieldToColumns(fieldId0, columns0); + map.put(fieldId1, columns1); + return map; + } } From 773a3c9af610e1fed461ff4ea757dd636d99b6e2 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Fri, 26 Jun 2026 17:56:43 +0800 Subject: [PATCH 2/4] fix comment --- .../shredding/MapSharedShreddingReader.java | 13 ++- .../shredding/MapSharedShreddingUtils.java | 2 +- .../MapSharedShreddingReaderTest.java | 83 +++++++++++++++++++ .../MapSharedShreddingUtilsTest.java | 12 ++- 4 files changed, 99 insertions(+), 11 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java index 1c58d660c16f..32ec301f93e3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java @@ -272,8 +272,15 @@ private static InternalMap rebuildLogicalMap( } InternalArray fieldMapping = physicalRow.getArray(0); Map result = new LinkedHashMap<>(); - int numMappedColumns = Math.min(context.numColumns, fieldMapping.size()); - for (int column = 0; column < numMappedColumns; column++) { + if (fieldMapping.size() != context.numColumns) { + throw new IllegalArgumentException( + "Shared-shredding field mapping size " + + fieldMapping.size() + + " does not match metadata num columns " + + context.numColumns + + "."); + } + for (int column = 0; column < context.numColumns; column++) { if (fieldMapping.isNullAt(column)) { throw new IllegalArgumentException( "Shared-shredding field mapping element cannot be null."); @@ -294,7 +301,7 @@ private static InternalMap rebuildLogicalMap( + MapSharedShreddingDefine.physicalColumnName(column) + "."); } - // TODO Support rebuilding in the user requested selected-key order once key-level + // TODO(lisizhuo.lsz): Support rebuilding in the user requested selected-key order once key-level // projection is pushed down. Full map reads currently follow the physical/metadata // layout order. result.put(fieldName, context.valueGetters[column].getFieldOrNull(physicalRow)); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java index cc5dab13fdc2..ecb5efe2ccc0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java @@ -137,7 +137,7 @@ public static boolean isOverflowField(MapSharedShreddingFieldMeta fieldMeta, Str return fieldMeta.overflowFieldSet().contains(fieldId); } - public static DataType buildSpecificPhysicalStructType( + public static RowType buildSpecificPhysicalRowType( DataType valueType, Set physicalColumnIds, boolean includeOverflow) { RowType.Builder builder = RowType.builder(); builder.field(MapSharedShreddingDefine.FIELD_MAPPING, new ArrayType(new IntType())); diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java index 84cae7b981ad..9e1c3e96c334 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java @@ -122,6 +122,82 @@ void testRebuildLogicalMap() throws IOException { } } + @Test + void testRebuildMultipleLogicalMaps() throws IOException { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, + "tags", + DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT())), + DataTypes.FIELD( + 2, + "plain", + DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())), + DataTypes.FIELD( + 3, + "attrs", + DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))); + Map metas = new TreeMap<>(); + metas.put( + "tags", + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + metas.put( + "attrs", + new MapSharedShreddingFieldMeta( + nameToId("x", 10, "y", 11), + fieldToColumns( + 10, Collections.singletonList(0), + 11, Collections.singletonList(1)), + new TreeSet(), + 2, + 2)); + + List physicalRows = + Collections.singletonList( + GenericRow.of( + 1, + GenericRow.of( + new GenericArray(new int[] {0, -1}), + 10L, + null, + intKeyMap(2, 30L)), + stringKeyMap("plain-key", BinaryString.fromString("plain-value")), + GenericRow.of( + new GenericArray(new int[] {11, 10}), + BinaryString.fromString("value-y"), + null))); + + try (MapSharedShreddingReader reader = + new MapSharedShreddingReader( + new InMemoryReader(physicalRows), logicalType, metas)) { + FileRecordIterator batch = reader.readBatch(); + assertThat(batch).isNotNull(); + + InternalRow row = batch.next(); + assertThat(row.getInt(0)).isEqualTo(1); + assertThat(row.getMap(1)).isEqualTo(stringKeyMap("a", 10L, "c", 30L)); + assertThat(row.getMap(2)) + .isEqualTo( + stringKeyMap( + "plain-key", BinaryString.fromString("plain-value"))); + assertThat(row.getMap(3)) + .isEqualTo(stringKeyMap("y", BinaryString.fromString("value-y"), "x", null)); + + assertThat(batch.next()).isNull(); + batch.releaseBatch(); + assertThat(reader.readBatch()).isNull(); + } + } + @Test void testInvalidNullFieldMapping() throws IOException { assertInvalidPhysicalTags( @@ -136,6 +212,13 @@ void testInvalidNullFieldMappingElement() throws IOException { "Shared-shredding field mapping element cannot be null"); } + @Test + void testInvalidFieldMappingSize() throws IOException { + assertInvalidPhysicalTags( + GenericRow.of(new GenericArray(new int[] {0}), 10L, null, null), + "Shared-shredding field mapping size 1 does not match metadata num columns 2"); + } + @Test void testInvalidUnknownFieldId() throws IOException { assertInvalidPhysicalTags( diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java index 6c139fdf811e..ac656efcd11d 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java @@ -235,13 +235,12 @@ void testIsOverflowField() { } @Test - void testBuildSpecificPhysicalStructType() { + void testBuildSpecificPhysicalRowType() { Set physicalColumnIds = new HashSet<>(Arrays.asList(3, 1)); RowType physicalType = - (RowType) - MapSharedShreddingUtils.buildSpecificPhysicalStructType( - DataTypes.BIGINT().notNull(), physicalColumnIds, true); + MapSharedShreddingUtils.buildSpecificPhysicalRowType( + DataTypes.BIGINT().notNull(), physicalColumnIds, true); assertThat(physicalType.getFieldNames()) .containsExactly("__field_mapping", "__col_1", "__col_3", "__overflow"); assertThat(physicalType.getFields()).extracting(DataField::id).containsExactly(0, 1, 2, 3); @@ -255,9 +254,8 @@ void testBuildSpecificPhysicalStructType() { .isEqualTo(DataTypes.MAP(DataTypes.INT(), DataTypes.BIGINT().notNull())); RowType physicalTypeWithoutOverflow = - (RowType) - MapSharedShreddingUtils.buildSpecificPhysicalStructType( - DataTypes.BIGINT(), physicalColumnIds, false); + MapSharedShreddingUtils.buildSpecificPhysicalRowType( + DataTypes.BIGINT(), physicalColumnIds, false); assertThat(physicalTypeWithoutOverflow.getFieldNames()) .containsExactly("__field_mapping", "__col_1", "__col_3"); } From 919e89070f5c61b1858746ca14c2bb337aa517f2 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Sun, 28 Jun 2026 22:38:31 +0800 Subject: [PATCH 3/4] fix --- .../shredding/MapSharedShreddingReader.java | 6 ++-- .../MapSharedShreddingReaderTest.java | 30 +++++-------------- .../MapSharedShreddingUtilsTest.java | 9 ++---- 3 files changed, 13 insertions(+), 32 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java index 32ec301f93e3..b5425758695f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java @@ -301,7 +301,8 @@ private static InternalMap rebuildLogicalMap( + MapSharedShreddingDefine.physicalColumnName(column) + "."); } - // TODO(lisizhuo.lsz): Support rebuilding in the user requested selected-key order once key-level + // TODO(lisizhuo.lsz): Support rebuilding in the user requested selected-key order once + // key-level // projection is pushed down. Full map reads currently follow the physical/metadata // layout order. result.put(fieldName, context.valueGetters[column].getFieldOrNull(physicalRow)); @@ -347,8 +348,7 @@ private SharedShreddingContext(MapSharedShreddingFieldMeta fieldMeta, DataType f this.valueGetters = new InternalRow.FieldGetter[fieldMeta.numColumns()]; for (int i = 0; i < fieldMeta.numColumns(); i++) { // plus 1 to skip __field_mapping - this.valueGetters[i] = - InternalRow.createFieldGetter(mapType.getValueType(), i + 1); + this.valueGetters[i] = InternalRow.createFieldGetter(mapType.getValueType(), i + 1); } this.overflowValueGetter = InternalArray.createElementGetter(mapType.getValueType()); this.numColumns = fieldMeta.numColumns(); diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java index 9e1c3e96c334..2a1dead45056 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java @@ -72,11 +72,7 @@ void testRebuildLogicalMap() throws IOException { Arrays.asList( GenericRow.of( 1, - GenericRow.of( - new GenericArray(new int[] {0, 1}), - 10L, - 20L, - null)), + GenericRow.of(new GenericArray(new int[] {0, 1}), 10L, 20L, null)), GenericRow.of( 2, GenericRow.of( @@ -87,10 +83,7 @@ void testRebuildLogicalMap() throws IOException { GenericRow.of( 3, GenericRow.of( - new GenericArray(new int[] {1, -1}), - null, - null, - null)), + new GenericArray(new int[] {1, -1}), null, null, null)), GenericRow.of(4, null)); try (MapSharedShreddingReader reader = @@ -128,17 +121,11 @@ void testRebuildMultipleLogicalMaps() throws IOException { DataTypes.ROW( DataTypes.FIELD(0, "id", DataTypes.INT()), DataTypes.FIELD( - 1, - "tags", - DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT())), + 1, "tags", DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT())), DataTypes.FIELD( - 2, - "plain", - DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())), + 2, "plain", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())), DataTypes.FIELD( - 3, - "attrs", - DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))); + 3, "attrs", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))); Map metas = new TreeMap<>(); metas.put( "tags", @@ -186,9 +173,7 @@ void testRebuildMultipleLogicalMaps() throws IOException { assertThat(row.getInt(0)).isEqualTo(1); assertThat(row.getMap(1)).isEqualTo(stringKeyMap("a", 10L, "c", 30L)); assertThat(row.getMap(2)) - .isEqualTo( - stringKeyMap( - "plain-key", BinaryString.fromString("plain-value"))); + .isEqualTo(stringKeyMap("plain-key", BinaryString.fromString("plain-value"))); assertThat(row.getMap(3)) .isEqualTo(stringKeyMap("y", BinaryString.fromString("value-y"), "x", null)); @@ -229,8 +214,7 @@ void testInvalidUnknownFieldId() throws IOException { @Test void testInvalidUnknownOverflowFieldId() throws IOException { assertInvalidPhysicalTags( - GenericRow.of( - new GenericArray(new int[] {0, -1}), 10L, null, intKeyMap(99, 20L)), + GenericRow.of(new GenericArray(new int[] {0, -1}), 10L, null, intKeyMap(99, 20L)), "Cannot find shared-shredding field id 99 in metadata"); } diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java index ac656efcd11d..2e3ecd1a8686 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java @@ -246,10 +246,8 @@ void testBuildSpecificPhysicalRowType() { assertThat(physicalType.getFields()).extracting(DataField::id).containsExactly(0, 1, 2, 3); assertThat(physicalType.getField("__field_mapping").type()) .isEqualTo(DataTypes.ARRAY(DataTypes.INT())); - assertThat(physicalType.getField("__col_1").type()) - .isEqualTo(DataTypes.BIGINT().notNull()); - assertThat(physicalType.getField("__col_3").type()) - .isEqualTo(DataTypes.BIGINT().notNull()); + assertThat(physicalType.getField("__col_1").type()).isEqualTo(DataTypes.BIGINT().notNull()); + assertThat(physicalType.getField("__col_3").type()).isEqualTo(DataTypes.BIGINT().notNull()); assertThat(physicalType.getField("__overflow").type()) .isEqualTo(DataTypes.MAP(DataTypes.INT(), DataTypes.BIGINT().notNull())); @@ -413,8 +411,7 @@ private static Map nameToId(Object... kvs) { return map; } - private static Map> fieldToColumns( - int fieldId, List columns) { + private static Map> fieldToColumns(int fieldId, List columns) { Map> map = new TreeMap<>(); map.put(fieldId, columns); return map; From fcedcbc16625df1041a9c435a9147eae46f51d32 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Sun, 28 Jun 2026 22:44:03 +0800 Subject: [PATCH 4/4] fix --- .../apache/paimon/data/shredding/MapSharedShreddingReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java index b5425758695f..f8ef521b8004 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java @@ -301,7 +301,7 @@ private static InternalMap rebuildLogicalMap( + MapSharedShreddingDefine.physicalColumnName(column) + "."); } - // TODO(lisizhuo.lsz): Support rebuilding in the user requested selected-key order once + // TODO: Support rebuilding in the user requested selected-key order once // key-level // projection is pushed down. Full map reads currently follow the physical/metadata // layout order.