diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java new file mode 100644 index 000000000000..f8ef521b8004 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingReader.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * A reader wrapper that rebuilds logical MAP values from shared-shredding physical ROW values. + * + *

The wrapped format reader reads the physical schema stored in a file. This reader presents the + * original logical schema to upper layers by lazily converting only shared-shredding MAP fields + * when {@link InternalRow#getMap(int)} is called. + */ +public class MapSharedShreddingReader implements FileRecordReader { + + private final FileRecordReader reader; + private final RowType logicalType; + private final Map contextByFieldIndex; + + public MapSharedShreddingReader( + FileRecordReader reader, + RowType logicalType, + Map fieldMetas) { + this.reader = reader; + this.logicalType = logicalType; + this.contextByFieldIndex = createContexts(logicalType, fieldMetas); + } + + public static Map readSharedShreddingMetas( + Map> fieldMetadata) { + Map metas = new LinkedHashMap<>(); + for (Map.Entry> entry : fieldMetadata.entrySet()) { + if (MapSharedShreddingUtils.hasShreddingMetadata(entry.getValue())) { + metas.put( + entry.getKey(), + MapSharedShreddingUtils.deserializeMetadata( + entry.getValue(), + MapSharedShreddingDefine.DEFAULT_DICT_COMPRESSION)); + } + } + return metas; + } + + private static Map createContexts( + RowType logicalType, Map fieldMetas) { + Map contexts = new LinkedHashMap<>(); + for (int i = 0; i < logicalType.getFieldCount(); i++) { + DataField field = logicalType.getFields().get(i); + MapSharedShreddingFieldMeta fieldMeta = fieldMetas.get(field.name()); + if (fieldMeta != null) { + contexts.put(i, new SharedShreddingContext(fieldMeta, field.type())); + } + } + return contexts; + } + + @Nullable + @Override + public FileRecordIterator readBatch() throws IOException { + FileRecordIterator iterator = reader.readBatch(); + if (iterator == null) { + return null; + } + return new SharedShreddingIterator(iterator); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + private class SharedShreddingIterator implements FileRecordIterator { + + private final FileRecordIterator iterator; + + private SharedShreddingIterator(FileRecordIterator iterator) { + this.iterator = iterator; + } + + @Override + public long returnedPosition() { + return iterator.returnedPosition(); + } + + @Override + public Path filePath() { + return iterator.filePath(); + } + + @Nullable + @Override + public InternalRow next() throws IOException { + InternalRow row = iterator.next(); + if (row == null) { + return null; + } + return new SharedShreddingRow(row); + } + + @Override + public void releaseBatch() { + iterator.releaseBatch(); + } + } + + private class SharedShreddingRow implements InternalRow { + + private final InternalRow row; + + private SharedShreddingRow(InternalRow row) { + this.row = row; + } + + @Override + public int getFieldCount() { + return logicalType.getFieldCount(); + } + + @Override + public RowKind getRowKind() { + return row.getRowKind(); + } + + @Override + public void setRowKind(RowKind kind) { + row.setRowKind(kind); + } + + @Override + public boolean isNullAt(int pos) { + return row.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return row.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return row.getByte(pos); + } + + @Override + public short getShort(int pos) { + return row.getShort(pos); + } + + @Override + public int getInt(int pos) { + return row.getInt(pos); + } + + @Override + public long getLong(int pos) { + return row.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return row.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return row.getDouble(pos); + } + + @Override + public BinaryString getString(int pos) { + return row.getString(pos); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + return row.getDecimal(pos, precision, scale); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + return row.getTimestamp(pos, precision); + } + + @Override + public byte[] getBinary(int pos) { + return row.getBinary(pos); + } + + @Override + public Variant getVariant(int pos) { + return row.getVariant(pos); + } + + @Override + public Blob getBlob(int pos) { + return row.getBlob(pos); + } + + @Override + public InternalArray getArray(int pos) { + return row.getArray(pos); + } + + @Override + public InternalVector getVector(int pos) { + return row.getVector(pos); + } + + @Override + public InternalMap getMap(int pos) { + SharedShreddingContext context = contextByFieldIndex.get(pos); + if (context == null) { + return row.getMap(pos); + } + if (row.isNullAt(pos)) { + return null; + } + InternalRow physicalRow = row.getRow(pos, context.numPhysicalFields); + return rebuildLogicalMap(physicalRow, context); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + return row.getRow(pos, numFields); + } + } + + private static InternalMap rebuildLogicalMap( + InternalRow physicalRow, SharedShreddingContext context) { + if (physicalRow.isNullAt(0)) { + throw new IllegalArgumentException( + "Shared-shredding field mapping cannot be null in a non-null physical row."); + } + InternalArray fieldMapping = physicalRow.getArray(0); + Map result = new LinkedHashMap<>(); + if (fieldMapping.size() != context.numColumns) { + throw new IllegalArgumentException( + "Shared-shredding field mapping size " + + fieldMapping.size() + + " does not match metadata num columns " + + context.numColumns + + "."); + } + for (int column = 0; column < context.numColumns; column++) { + if (fieldMapping.isNullAt(column)) { + throw new IllegalArgumentException( + "Shared-shredding field mapping element cannot be null."); + } + int fieldId = fieldMapping.getInt(column); + if (fieldId < 0) { + continue; + } + BinaryString fieldName = context.nameById.get(fieldId); + if (fieldName == null) { + throw new IllegalArgumentException( + "Cannot find shared-shredding field id " + fieldId + " in metadata."); + } + int valuePosition = column + 1; + if (valuePosition >= physicalRow.getFieldCount()) { + throw new IllegalArgumentException( + "Cannot find shared-shredding physical column " + + MapSharedShreddingDefine.physicalColumnName(column) + + "."); + } + // TODO: Support rebuilding in the user requested selected-key order once + // key-level + // projection is pushed down. Full map reads currently follow the physical/metadata + // layout order. + result.put(fieldName, context.valueGetters[column].getFieldOrNull(physicalRow)); + } + if (context.overflowPosition < physicalRow.getFieldCount() + && !physicalRow.isNullAt(context.overflowPosition)) { + InternalMap overflow = physicalRow.getMap(context.overflowPosition); + InternalArray keys = overflow.keyArray(); + InternalArray values = overflow.valueArray(); + for (int i = 0; i < overflow.size(); i++) { + if (keys.isNullAt(i)) { + throw new IllegalArgumentException( + "Shared-shredding overflow field id cannot be null."); + } + int fieldId = keys.getInt(i); + BinaryString fieldName = context.nameById.get(fieldId); + if (fieldName == null) { + throw new IllegalArgumentException( + "Cannot find shared-shredding field id " + fieldId + " in metadata."); + } + result.put(fieldName, context.overflowValueGetter.getElementOrNull(values, i)); + } + } + return new GenericMap(result); + } + + private static class SharedShreddingContext { + + private final Map nameById; + private final InternalRow.FieldGetter[] valueGetters; + private final InternalArray.ElementGetter overflowValueGetter; + private final int numColumns; + private final int overflowPosition; + private final int numPhysicalFields; + + private SharedShreddingContext(MapSharedShreddingFieldMeta fieldMeta, DataType fieldType) { + MapType mapType = (MapType) fieldType; + this.nameById = new LinkedHashMap<>(); + for (Map.Entry entry : fieldMeta.nameToId().entrySet()) { + // ordered by dict + this.nameById.put(entry.getValue(), BinaryString.fromString(entry.getKey())); + } + this.valueGetters = new InternalRow.FieldGetter[fieldMeta.numColumns()]; + for (int i = 0; i < fieldMeta.numColumns(); i++) { + // plus 1 to skip __field_mapping + this.valueGetters[i] = InternalRow.createFieldGetter(mapType.getValueType(), i + 1); + } + this.overflowValueGetter = InternalArray.createElementGetter(mapType.getValueType()); + this.numColumns = fieldMeta.numColumns(); + this.overflowPosition = fieldMeta.numColumns() + 1; + this.numPhysicalFields = + fieldMeta.overflowFieldSet().isEmpty() + ? fieldMeta.numColumns() + 1 + : fieldMeta.numColumns() + 2; + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java index 6f0cd879c52a..ecb5efe2ccc0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/shredding/MapSharedShreddingUtils.java @@ -111,6 +111,45 @@ public static Map buildColumnToNumColumns( return fieldToNumColumns; } + public static List getPhysicalColumnIndices( + MapSharedShreddingFieldMeta fieldMeta, String fieldName) { + Integer fieldId = fieldMeta.nameToId().get(fieldName); + if (fieldId == null) { + throw new IllegalArgumentException( + "cannot find field " + fieldName + " in map shared shredding meta"); + } + List columns = fieldMeta.fieldToColumns().get(fieldId); + if (columns == null) { + throw new IllegalArgumentException( + "cannot find field id " + + fieldId + + " in field_to_columns in map shared shredding meta"); + } + return columns; + } + + public static boolean isOverflowField(MapSharedShreddingFieldMeta fieldMeta, String fieldName) { + Integer fieldId = fieldMeta.nameToId().get(fieldName); + if (fieldId == null) { + throw new IllegalArgumentException( + "cannot find field " + fieldName + " in map shared shredding meta"); + } + return fieldMeta.overflowFieldSet().contains(fieldId); + } + + public static RowType buildSpecificPhysicalRowType( + DataType valueType, Set physicalColumnIds, boolean includeOverflow) { + RowType.Builder builder = RowType.builder(); + builder.field(MapSharedShreddingDefine.FIELD_MAPPING, new ArrayType(new IntType())); + for (Integer physicalColumnId : new TreeSet<>(physicalColumnIds)) { + builder.field(MapSharedShreddingDefine.physicalColumnName(physicalColumnId), valueType); + } + if (includeOverflow) { + builder.field(MapSharedShreddingDefine.OVERFLOW, new MapType(new IntType(), valueType)); + } + return builder.build(); + } + public static void serializeMetadata( MapSharedShreddingFieldMeta fieldMeta, String compression, diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java new file mode 100644 index 000000000000..2a1dead45056 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingReaderTest.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.shredding; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link MapSharedShreddingReader}. */ +class MapSharedShreddingReaderTest { + + @Test + void testRebuildLogicalMap() throws IOException { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, "tags", DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()))); + Map metas = new TreeMap<>(); + metas.put( + "tags", + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + + List physicalRows = + Arrays.asList( + GenericRow.of( + 1, + GenericRow.of(new GenericArray(new int[] {0, 1}), 10L, 20L, null)), + GenericRow.of( + 2, + GenericRow.of( + new GenericArray(new int[] {0, -1}), + 40L, + null, + intKeyMap(2, null))), + GenericRow.of( + 3, + GenericRow.of( + new GenericArray(new int[] {1, -1}), null, null, null)), + GenericRow.of(4, null)); + + try (MapSharedShreddingReader reader = + new MapSharedShreddingReader( + new InMemoryReader(physicalRows), logicalType, metas)) { + FileRecordIterator batch = reader.readBatch(); + assertThat(batch).isNotNull(); + + InternalRow row = batch.next(); + assertThat(row.getInt(0)).isEqualTo(1); + assertThat(row.getMap(1)).isEqualTo(stringKeyMap("a", 10L, "b", 20L)); + + row = batch.next(); + assertThat(row.getInt(0)).isEqualTo(2); + assertThat(row.getMap(1)).isEqualTo(stringKeyMap("a", 40L, "c", null)); + + row = batch.next(); + assertThat(row.getInt(0)).isEqualTo(3); + assertThat(row.getMap(1)).isEqualTo(stringKeyMap("b", null)); + + row = batch.next(); + assertThat(row.getInt(0)).isEqualTo(4); + assertThat(row.isNullAt(1)).isTrue(); + assertThat(row.getMap(1)).isNull(); + + assertThat(batch.next()).isNull(); + batch.releaseBatch(); + assertThat(reader.readBatch()).isNull(); + } + } + + @Test + void testRebuildMultipleLogicalMaps() throws IOException { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, "tags", DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT())), + DataTypes.FIELD( + 2, "plain", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())), + DataTypes.FIELD( + 3, "attrs", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))); + Map metas = new TreeMap<>(); + metas.put( + "tags", + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + metas.put( + "attrs", + new MapSharedShreddingFieldMeta( + nameToId("x", 10, "y", 11), + fieldToColumns( + 10, Collections.singletonList(0), + 11, Collections.singletonList(1)), + new TreeSet(), + 2, + 2)); + + List physicalRows = + Collections.singletonList( + GenericRow.of( + 1, + GenericRow.of( + new GenericArray(new int[] {0, -1}), + 10L, + null, + intKeyMap(2, 30L)), + stringKeyMap("plain-key", BinaryString.fromString("plain-value")), + GenericRow.of( + new GenericArray(new int[] {11, 10}), + BinaryString.fromString("value-y"), + null))); + + try (MapSharedShreddingReader reader = + new MapSharedShreddingReader( + new InMemoryReader(physicalRows), logicalType, metas)) { + FileRecordIterator batch = reader.readBatch(); + assertThat(batch).isNotNull(); + + InternalRow row = batch.next(); + assertThat(row.getInt(0)).isEqualTo(1); + assertThat(row.getMap(1)).isEqualTo(stringKeyMap("a", 10L, "c", 30L)); + assertThat(row.getMap(2)) + .isEqualTo(stringKeyMap("plain-key", BinaryString.fromString("plain-value"))); + assertThat(row.getMap(3)) + .isEqualTo(stringKeyMap("y", BinaryString.fromString("value-y"), "x", null)); + + assertThat(batch.next()).isNull(); + batch.releaseBatch(); + assertThat(reader.readBatch()).isNull(); + } + } + + @Test + void testInvalidNullFieldMapping() throws IOException { + assertInvalidPhysicalTags( + GenericRow.of(null, 10L, 20L, null), + "Shared-shredding field mapping cannot be null"); + } + + @Test + void testInvalidNullFieldMappingElement() throws IOException { + assertInvalidPhysicalTags( + GenericRow.of(new GenericArray(new Object[] {0, null}), 10L, 20L, null), + "Shared-shredding field mapping element cannot be null"); + } + + @Test + void testInvalidFieldMappingSize() throws IOException { + assertInvalidPhysicalTags( + GenericRow.of(new GenericArray(new int[] {0}), 10L, null, null), + "Shared-shredding field mapping size 1 does not match metadata num columns 2"); + } + + @Test + void testInvalidUnknownFieldId() throws IOException { + assertInvalidPhysicalTags( + GenericRow.of(new GenericArray(new int[] {0, 99}), 10L, 20L, null), + "Cannot find shared-shredding field id 99 in metadata"); + } + + @Test + void testInvalidUnknownOverflowFieldId() throws IOException { + assertInvalidPhysicalTags( + GenericRow.of(new GenericArray(new int[] {0, -1}), 10L, null, intKeyMap(99, 20L)), + "Cannot find shared-shredding field id 99 in metadata"); + } + + private static void assertInvalidPhysicalTags(InternalRow physicalTags, String message) + throws IOException { + RowType logicalType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD( + 1, "tags", DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()))); + try (MapSharedShreddingReader reader = + new MapSharedShreddingReader( + new InMemoryReader( + Collections.singletonList(GenericRow.of(1, physicalTags))), + logicalType, + metas())) { + FileRecordIterator batch = reader.readBatch(); + InternalRow row = batch.next(); + assertThatThrownBy(() -> row.getMap(1)).hasMessageContaining(message); + batch.releaseBatch(); + } + } + + private static Map metas() { + Map metas = new TreeMap<>(); + metas.put( + "tags", + new MapSharedShreddingFieldMeta( + nameToId("a", 0, "b", 1, "c", 2), + fieldToColumns( + 0, Collections.singletonList(0), + 1, Collections.singletonList(1)), + new TreeSet<>(Collections.singletonList(2)), + 2, + 3)); + return metas; + } + + private static Map nameToId(Object... kvs) { + Map map = new TreeMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put((String) kvs[i], (Integer) kvs[i + 1]); + } + return map; + } + + private static Map> fieldToColumns( + int fieldId0, List columns0, int fieldId1, List columns1) { + Map> map = new TreeMap<>(); + map.put(fieldId0, columns0); + map.put(fieldId1, columns1); + return map; + } + + private static InternalMap stringKeyMap(Object... kvs) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put(BinaryString.fromString((String) kvs[i]), kvs[i + 1]); + } + return new GenericMap(map); + } + + private static InternalMap intKeyMap(Object... kvs) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put(kvs[i], kvs[i + 1]); + } + return new GenericMap(map); + } + + private static class InMemoryReader implements FileRecordReader { + + private final List rows; + private boolean read; + + private InMemoryReader(List rows) { + this.rows = rows; + } + + @Nullable + @Override + public FileRecordIterator readBatch() { + if (read) { + return null; + } + read = true; + return new InMemoryIterator(rows); + } + + @Override + public void close() {} + } + + private static class InMemoryIterator implements FileRecordIterator { + + private final List rows; + private int index; + + private InMemoryIterator(List rows) { + this.rows = rows; + } + + @Override + public long returnedPosition() { + return index - 1; + } + + @Override + public Path filePath() { + return new Path("memory"); + } + + @Nullable + @Override + public InternalRow next() { + return index < rows.size() ? rows.get(index++) : null; + } + + @Override + public void releaseBatch() {} + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java index 29314c6a00ad..2e3ecd1a8686 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/shredding/MapSharedShreddingUtilsTest.java @@ -31,7 +31,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -186,6 +188,76 @@ void testLogicalToPhysicalSchemaNoShreddingColumns() { .isEqualTo(logical); } + @Test + void testGetPhysicalColumnIndices() { + MapSharedShreddingFieldMeta fieldMeta = + new MapSharedShreddingFieldMeta( + nameToId("age", 0, "name", 1), + fieldToColumns(0, Arrays.asList(0, 2), 1, Arrays.asList(1)), + new HashSet<>(), + 3, + 2); + + assertThat(MapSharedShreddingUtils.getPhysicalColumnIndices(fieldMeta, "age")) + .containsExactly(0, 2); + assertThat(MapSharedShreddingUtils.getPhysicalColumnIndices(fieldMeta, "name")) + .containsExactly(1); + + assertThatThrownBy( + () -> MapSharedShreddingUtils.getPhysicalColumnIndices(fieldMeta, "score")) + .hasMessageContaining("cannot find field score in map shared shredding meta"); + + MapSharedShreddingFieldMeta missingColumnsMeta = + new MapSharedShreddingFieldMeta( + nameToId("age", 0), new TreeMap<>(), new HashSet<>(), 3, 2); + assertThatThrownBy( + () -> + MapSharedShreddingUtils.getPhysicalColumnIndices( + missingColumnsMeta, "age")) + .hasMessageContaining( + "cannot find field id 0 in field_to_columns in map shared shredding meta"); + } + + @Test + void testIsOverflowField() { + MapSharedShreddingFieldMeta fieldMeta = + new MapSharedShreddingFieldMeta( + nameToId("age", 0, "name", 1), + fieldToColumns(0, Arrays.asList(0), 1, Arrays.asList(1)), + new TreeSet<>(Arrays.asList(1)), + 2, + 2); + + assertThat(MapSharedShreddingUtils.isOverflowField(fieldMeta, "age")).isFalse(); + assertThat(MapSharedShreddingUtils.isOverflowField(fieldMeta, "name")).isTrue(); + assertThatThrownBy(() -> MapSharedShreddingUtils.isOverflowField(fieldMeta, "score")) + .hasMessageContaining("cannot find field score in map shared shredding meta"); + } + + @Test + void testBuildSpecificPhysicalRowType() { + Set physicalColumnIds = new HashSet<>(Arrays.asList(3, 1)); + + RowType physicalType = + MapSharedShreddingUtils.buildSpecificPhysicalRowType( + DataTypes.BIGINT().notNull(), physicalColumnIds, true); + assertThat(physicalType.getFieldNames()) + .containsExactly("__field_mapping", "__col_1", "__col_3", "__overflow"); + assertThat(physicalType.getFields()).extracting(DataField::id).containsExactly(0, 1, 2, 3); + assertThat(physicalType.getField("__field_mapping").type()) + .isEqualTo(DataTypes.ARRAY(DataTypes.INT())); + assertThat(physicalType.getField("__col_1").type()).isEqualTo(DataTypes.BIGINT().notNull()); + assertThat(physicalType.getField("__col_3").type()).isEqualTo(DataTypes.BIGINT().notNull()); + assertThat(physicalType.getField("__overflow").type()) + .isEqualTo(DataTypes.MAP(DataTypes.INT(), DataTypes.BIGINT().notNull())); + + RowType physicalTypeWithoutOverflow = + MapSharedShreddingUtils.buildSpecificPhysicalRowType( + DataTypes.BIGINT(), physicalColumnIds, false); + assertThat(physicalTypeWithoutOverflow.getFieldNames()) + .containsExactly("__field_mapping", "__col_1", "__col_3"); + } + @Test void testMetadataRoundtrip() { Map nameToId = new TreeMap<>(); @@ -330,4 +402,25 @@ void testPhysicalColumnName() { assertThat(MapSharedShreddingDefine.physicalColumnName(1)).isEqualTo("__col_1"); assertThat(MapSharedShreddingDefine.physicalColumnName(99)).isEqualTo("__col_99"); } + + private static Map nameToId(Object... kvs) { + Map map = new TreeMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put((String) kvs[i], (Integer) kvs[i + 1]); + } + return map; + } + + private static Map> fieldToColumns(int fieldId, List columns) { + Map> map = new TreeMap<>(); + map.put(fieldId, columns); + return map; + } + + private static Map> fieldToColumns( + int fieldId0, List columns0, int fieldId1, List columns1) { + Map> map = fieldToColumns(fieldId0, columns0); + map.put(fieldId1, columns1); + return map; + } }