diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java index ec484de64bd8..ac4defe47928 100644 --- a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java +++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java @@ -20,13 +20,18 @@ import org.apache.paimon.arrow.ArrowUtils; import org.apache.paimon.arrow.writer.ArrowFieldWriter; +import org.apache.paimon.data.BinaryVector; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.columnar.ColumnVector; import org.apache.paimon.data.columnar.ColumnarVec; +import org.apache.paimon.data.columnar.RowToColumnConverter; import org.apache.paimon.data.columnar.VecColumnVector; import org.apache.paimon.data.columnar.VectorizedColumnBatch; import org.apache.paimon.data.columnar.heap.HeapFloatVector; +import org.apache.paimon.data.columnar.heap.HeapVectorColumnVector; +import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.reader.VectorizedRecordIterator; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -96,6 +101,56 @@ public void releaseBatch() {} } } + @Test + public void testNullableVectorColumnFromRowToColumnConverter() { + RowType rowType = RowType.of(DataTypes.VECTOR(3, DataTypes.FLOAT())); + RowToColumnConverter rowToColumnConverter = new RowToColumnConverter(rowType); + + HeapFloatVector elementVector = new HeapFloatVector(6); + HeapVectorColumnVector vectorColumn = new HeapVectorColumnVector(2, elementVector, 3); + WritableColumnVector[] vectors = new WritableColumnVector[] {vectorColumn}; + rowToColumnConverter.convert(GenericRow.of((Object) null), vectors); + rowToColumnConverter.convert( + GenericRow.of(BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f})), + vectors); + + try (RootAllocator allocator = new RootAllocator()) { + VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator); + ArrowFieldWriter[] fieldWriters = ArrowUtils.createArrowFieldWriters(vsr, rowType); + VectorizedColumnBatch batch = + new VectorizedColumnBatch(new ColumnVector[] {vectorColumn}); + batch.setNumRows(2); + + ArrowVectorizedBatchConverter converter = + new ArrowVectorizedBatchConverter(vsr, fieldWriters); + converter.reset( + new VectorizedRecordIterator() { + @Override + public VectorizedColumnBatch batch() { + return batch; + } + + @Override + public InternalRow next() { + return null; + } + + @Override + public void releaseBatch() {} + }); + converter.next(2); + + FixedSizeListVector listVector = (FixedSizeListVector) vsr.getVector(0); + assertThat(listVector.isNull(0)).isTrue(); + assertThat(listVector.getObject(0)).isNull(); + @SuppressWarnings("unchecked") + List row1 = (List) listVector.getObject(1); + assertThat(row1).containsExactly(1.0f, 2.0f, 3.0f); + + converter.close(); + } + } + @Test public void testVectorColumnWriteWithPickedInColumn() { RowType rowType = RowType.of(DataTypes.VECTOR(2, DataTypes.FLOAT())); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java index de962ad86a39..f3fdba7b099a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java @@ -23,10 +23,12 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.columnar.heap.HeapArrayVector; import org.apache.paimon.data.columnar.heap.HeapMapVector; import org.apache.paimon.data.columnar.heap.HeapRowVector; +import org.apache.paimon.data.columnar.heap.HeapVectorColumnVector; import org.apache.paimon.data.columnar.writable.WritableBooleanVector; import org.apache.paimon.data.columnar.writable.WritableByteVector; import org.apache.paimon.data.columnar.writable.WritableBytesVector; @@ -284,7 +286,31 @@ public TypeConverter visit(ArrayType arrayType) { @Override public TypeConverter visit(VectorType vectorType) { - throw new UnsupportedOperationException(); + TypeConverter elementConverter = + getConverterForType(vectorType.getElementType().notNull()); + return createConverter( + vectorType.isNullable(), + (row, column, cv) -> { + HeapVectorColumnVector vectorColumn = (HeapVectorColumnVector) cv; + if (vectorColumn.getVectorSize() != vectorType.getLength()) { + throw new IllegalArgumentException( + "Vector column length mismatch: expected " + + vectorType.getLength() + + " but got " + + vectorColumn.getVectorSize()); + } + + InternalVector values = row.getVector(column); + checkVectorLength(values, vectorType.getLength()); + checkVectorElementsNonNull(values); + vectorColumn.appendVector(); + + WritableColumnVector vectorData = + (WritableColumnVector) vectorColumn.getColumnVector(); + for (int i = 0; i < values.size(); i++) { + elementConverter.append(values, i, vectorData); + } + }); } @Override @@ -374,6 +400,25 @@ private static TypeConverter timestampConverter(boolean nullable, int precision) } }); } + + private static void checkVectorLength(InternalVector vector, int expectedLength) { + if (vector.size() != expectedLength) { + throw new IllegalArgumentException( + "Vector length mismatch: expected " + + expectedLength + + " but got " + + vector.size()); + } + } + + private static void checkVectorElementsNonNull(InternalVector vector) { + for (int i = 0; i < vector.size(); i++) { + if (vector.isNullAt(i)) { + throw new UnsupportedOperationException( + "Vector elements must not be null."); + } + } + } } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapVectorColumnVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapVectorColumnVector.java new file mode 100644 index 000000000000..1d79c4c25ff4 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapVectorColumnVector.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.columnar.heap; + +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.data.columnar.ColumnVector; +import org.apache.paimon.data.columnar.ColumnarVec; +import org.apache.paimon.data.columnar.VecColumnVector; +import org.apache.paimon.data.columnar.writable.WritableColumnVector; + +/** Heap column vector for VectorType. */ +public class HeapVectorColumnVector extends AbstractArrayBasedVector implements VecColumnVector { + + private final int vectorSize; + + public HeapVectorColumnVector(int len, ColumnVector vector, int vectorSize) { + super(len, new ColumnVector[] {vector}); + if (vectorSize <= 0) { + throw new IllegalArgumentException("Vector size must be positive."); + } + this.vectorSize = vectorSize; + } + + public void appendVector() { + reserve(elementsAppended + 1); + long offset = (long) elementsAppended * vectorSize; + reserveChild(offset + vectorSize); + putOffsetLength(elementsAppended, offset, vectorSize); + elementsAppended++; + } + + @Override + public void appendNull() { + int index = elementsAppended; + appendVector(); + setNullAt(index); + + ColumnVector child = children[0]; + if (child instanceof WritableColumnVector) { + WritableColumnVector writableChild = (WritableColumnVector) child; + int offset = (int) offsets[index]; + writableChild.setNulls(offset, vectorSize); + writableChild.addElementsAppended(vectorSize); + } + } + + @Override + public InternalVector getVector(int i) { + long offset = offsets[i]; + long length = lengths[i]; + if (length != vectorSize) { + throw new IllegalArgumentException( + "Vector length mismatch: expected " + vectorSize + " but got " + length); + } + return ColumnarVec.DEFAULT_FACTORY.create(children[0], (int) offset, (int) length); + } + + @Override + public ColumnVector getColumnVector() { + return children[0]; + } + + @Override + public int getVectorSize() { + return vectorSize; + } + + private void reserveChild(long requiredCapacity) { + if (requiredCapacity > Integer.MAX_VALUE) { + throw new UnsupportedOperationException( + "Cannot allocate " + requiredCapacity + " vector elements"); + } + ColumnVector child = children[0]; + if (child instanceof WritableColumnVector) { + ((WritableColumnVector) child).reserve((int) requiredCapacity); + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java index 30c93c30b290..4ceffc8f5f65 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java @@ -22,12 +22,14 @@ import org.apache.paimon.data.BinaryArray; import org.apache.paimon.data.BinaryMap; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.InternalRow.FieldGetter; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.io.DataInputView; @@ -37,6 +39,8 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VectorType; +import org.apache.paimon.utils.TypeCheckUtils; import org.apache.paimon.utils.VarLengthIntUtils; import javax.annotation.Nullable; @@ -164,6 +168,7 @@ public InternalRow deserialize(byte[] bytes) { } public Comparator createSliceComparator() { + checkComparableFields(rowType); return new SliceComparator(rowType); } @@ -235,6 +240,18 @@ private static FieldWriter createFieldWriter(DataType fieldType) { (InternalArray) value, (InternalArraySerializer) arraySerializer); break; + case VECTOR: + VectorType vectorType = (VectorType) fieldType; + InternalVectorSerializer vectorSerializer = + new InternalVectorSerializer( + vectorType.getElementType(), vectorType.getLength()); + fieldWriter = + (writer, pos, value) -> { + InternalVector vector = (InternalVector) value; + checkVectorLength(vector, vectorType.getLength()); + writer.writeVector(vector, vectorSerializer); + }; + break; case MULTISET: case MAP: Serializer mapSerializer = InternalSerializers.create(fieldType); @@ -329,6 +346,10 @@ private static FieldReader createFieldReader(DataType fieldType) { case ARRAY: fieldReader = (reader, pos) -> reader.readArray(); break; + case VECTOR: + VectorType vectorType = (VectorType) fieldType; + fieldReader = (reader, pos) -> reader.readVector(vectorType.getLength()); + break; case MULTISET: case MAP: fieldReader = (reader, pos) -> reader.readMap(); @@ -356,6 +377,27 @@ private static FieldReader createFieldReader(DataType fieldType) { }; } + private static void checkVectorLength(InternalVector vector, int expectedLength) { + if (vector.size() != expectedLength) { + throw new IllegalArgumentException( + "Vector length mismatch: expected " + + expectedLength + + " but got " + + vector.size()); + } + } + + private static void checkComparableFields(RowType rowType) { + for (int i = 0; i < rowType.getFieldCount(); i++) { + DataType type = rowType.getTypeAt(i); + checkArgument( + TypeCheckUtils.isComparable(type), + "Field %s with type %s is not comparable in slice comparator.", + rowType.getFields().get(i).name(), + type); + } + } + private interface FieldWriter extends Serializable { void writeField(RowWriter writer, int pos, Object value); } @@ -467,6 +509,11 @@ private void writeArray(InternalArray value, InternalArraySerializer serializer) writeSegments(binary.getSegments(), binary.getOffset(), binary.getSizeInBytes()); } + private void writeVector(InternalVector value, InternalVectorSerializer serializer) { + BinaryVector binary = serializer.toBinaryVector(value); + writeSegments(binary.getSegments(), binary.getOffset(), binary.getSizeInBytes()); + } + private void writeMap(InternalMap value, InternalMapSerializer serializer) { BinaryMap binary = serializer.toBinaryMap(value); writeSegments(binary.getSegments(), binary.getOffset(), binary.getSizeInBytes()); @@ -659,6 +706,14 @@ private InternalArray readArray() { return value; } + private InternalVector readVector(int vectorLength) { + BinaryVector value = new BinaryVector(vectorLength); + int length = readUnsignedInt(); + value.pointTo(segments, position, length); + position += length; + return value; + } + private InternalMap readMap() { BinaryMap value = new BinaryMap(); int length = readUnsignedInt(); diff --git a/paimon-common/src/test/java/org/apache/paimon/data/columnar/RowToColumnConverterTest.java b/paimon-common/src/test/java/org/apache/paimon/data/columnar/RowToColumnConverterTest.java index 9f2ef2fc3c3b..95c20fc971b2 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/columnar/RowToColumnConverterTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/columnar/RowToColumnConverterTest.java @@ -19,10 +19,12 @@ package org.apache.paimon.data.columnar; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericMap; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.columnar.heap.HeapArrayVector; import org.apache.paimon.data.columnar.heap.HeapBooleanVector; @@ -35,6 +37,7 @@ import org.apache.paimon.data.columnar.heap.HeapMapVector; import org.apache.paimon.data.columnar.heap.HeapRowVector; import org.apache.paimon.data.columnar.heap.HeapShortVector; +import org.apache.paimon.data.columnar.heap.HeapVectorColumnVector; import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; @@ -57,11 +60,13 @@ import org.junit.jupiter.api.Test; +import java.lang.reflect.Proxy; import java.math.BigDecimal; import java.util.LinkedHashMap; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link RowToColumnConverter}. */ public class RowToColumnConverterTest { @@ -373,6 +378,87 @@ public void testConvertArrayType() { assertThat(elementVector.getInt(7)).isEqualTo(8); } + @Test + public void testConvertVectorType() { + RowType rowType = RowType.of(new DataField(0, "f", DataTypes.VECTOR(3, DataTypes.FLOAT()))); + RowToColumnConverter converter = new RowToColumnConverter(rowType); + + GenericRow row1 = + GenericRow.of(BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f})); + GenericRow row2 = + GenericRow.of(BinaryVector.fromPrimitiveArray(new float[] {4.0f, 5.0f, 6.0f})); + + HeapFloatVector elementVector = new HeapFloatVector(6); + HeapVectorColumnVector vectorColumn = new HeapVectorColumnVector(2, elementVector, 3); + WritableColumnVector[] vectors = new WritableColumnVector[] {vectorColumn}; + + converter.convert(row1, vectors); + converter.convert(row2, vectors); + + assertThat(elementVector.getFloat(0)).isEqualTo(1.0f); + assertThat(elementVector.getFloat(1)).isEqualTo(2.0f); + assertThat(elementVector.getFloat(2)).isEqualTo(3.0f); + assertThat(elementVector.getFloat(3)).isEqualTo(4.0f); + assertThat(elementVector.getFloat(4)).isEqualTo(5.0f); + assertThat(elementVector.getFloat(5)).isEqualTo(6.0f); + assertThat(vectorColumn.getVector(0).toFloatArray()).containsExactly(1.0f, 2.0f, 3.0f); + assertThat(vectorColumn.getVector(1).toFloatArray()).containsExactly(4.0f, 5.0f, 6.0f); + } + + @Test + public void testConvertNullableVectorType() { + RowType rowType = RowType.of(new DataField(0, "f", DataTypes.VECTOR(3, DataTypes.FLOAT()))); + RowToColumnConverter converter = new RowToColumnConverter(rowType); + + GenericRow row1 = GenericRow.of((Object) null); + GenericRow row2 = + GenericRow.of(BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f})); + + HeapFloatVector elementVector = new HeapFloatVector(3); + HeapVectorColumnVector vectorColumn = new HeapVectorColumnVector(2, elementVector, 3); + WritableColumnVector[] vectors = new WritableColumnVector[] {vectorColumn}; + + converter.convert(row1, vectors); + converter.convert(row2, vectors); + + assertThat(vectorColumn.isNullAt(0)).isTrue(); + assertThat(elementVector.isNullAt(0)).isTrue(); + assertThat(elementVector.isNullAt(1)).isTrue(); + assertThat(elementVector.isNullAt(2)).isTrue(); + assertThat(elementVector.getFloat(3)).isEqualTo(1.0f); + assertThat(elementVector.getFloat(4)).isEqualTo(2.0f); + assertThat(elementVector.getFloat(5)).isEqualTo(3.0f); + assertThat(vectorColumn.getVector(1).toFloatArray()).containsExactly(1.0f, 2.0f, 3.0f); + } + + @Test + public void testConvertVectorTypeWithInvalidLength() { + RowType rowType = RowType.of(new DataField(0, "f", DataTypes.VECTOR(3, DataTypes.FLOAT()))); + RowToColumnConverter converter = new RowToColumnConverter(rowType); + + GenericRow row = GenericRow.of(BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f})); + HeapVectorColumnVector vectorColumn = + new HeapVectorColumnVector(1, new HeapFloatVector(2), 3); + + assertThatThrownBy(() -> converter.convert(row, new WritableColumnVector[] {vectorColumn})) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Vector length mismatch"); + } + + @Test + public void testConvertVectorTypeWithNullElement() { + RowType rowType = RowType.of(new DataField(0, "f", DataTypes.VECTOR(3, DataTypes.FLOAT()))); + RowToColumnConverter converter = new RowToColumnConverter(rowType); + + GenericRow row = GenericRow.of(createFloatVectorWithNullElement()); + HeapVectorColumnVector vectorColumn = + new HeapVectorColumnVector(1, new HeapFloatVector(3), 3); + + assertThatThrownBy(() -> converter.convert(row, new WritableColumnVector[] {vectorColumn})) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Vector elements must not be null"); + } + @Test public void testConvertMapType() { RowType rowType = @@ -507,4 +593,19 @@ public void testConvertNullableRowType() { assertThat(idVector.isNullAt(2)).isTrue(); assertThat(nameVector.isNullAt(2)).isTrue(); } + + private static InternalVector createFloatVectorWithNullElement() { + BinaryVector vector = BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f}); + Object proxy = + Proxy.newProxyInstance( + RowToColumnConverterTest.class.getClassLoader(), + new Class[] {InternalVector.class}, + (obj, method, args) -> { + if ("isNullAt".equals(method.getName()) && ((Integer) args[0]) == 1) { + return true; + } + return method.invoke(vector, args); + }); + return (InternalVector) proxy; + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java b/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java index 6e03a5699916..9b55fe2261a4 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.data.serializer; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.variant.GenericVariant; @@ -26,11 +27,14 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.junit.jupiter.api.Test; + import static org.apache.paimon.data.BinaryString.fromString; import static org.apache.paimon.data.serializer.InternalRowSerializerTest.createArray; import static org.apache.paimon.data.serializer.InternalRowSerializerTest.createMap; import static org.apache.paimon.data.serializer.InternalRowSerializerTest.createRow; import static org.apache.paimon.data.serializer.InternalRowSerializerTest.deepEqualsInternalRow; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link RowCompactedSerializer}. */ abstract class RowCompactedSerializerTest extends SerializerTestInstance { @@ -198,6 +202,41 @@ private static RowCompactedSerializer getRowSerializer() { } } + static final class VectorTypesTest extends RowCompactedSerializerTest { + public VectorTypesTest() { + super(getRowSerializer(), getData()); + } + + private static InternalRow[] getData() { + return new GenericRow[] { + GenericRow.of((Object) null), + GenericRow.of(BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f})), + GenericRow.of(BinaryVector.fromPrimitiveArray(new float[] {-1.0f, 0.5f, 4.5f})) + }; + } + + private static RowCompactedSerializer getRowSerializer() { + return new RowCompactedSerializer(RowType.of(DataTypes.VECTOR(3, DataTypes.FLOAT()))); + } + + @Test + public void testSerializeVectorWithInvalidLength() { + GenericRow row = + GenericRow.of(BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f})); + + assertThatThrownBy(() -> getRowSerializer().serializeToBytes(row)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Vector length mismatch"); + } + + @Test + public void testCreateSliceComparatorWithVectorType() { + assertThatThrownBy(() -> getRowSerializer().createSliceComparator()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("not comparable"); + } + } + static final class NestedInternalRowTest extends RowCompactedSerializerTest { private static final RowType NESTED_DATA_TYPE = diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index fb4f5862685a..bbb147d19bc5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -45,6 +45,7 @@ import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.VectorType; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SetUtils; import org.apache.paimon.utils.StringUtils; @@ -103,7 +104,12 @@ public class SchemaValidation { public static final List> PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES = - Arrays.asList(MapType.class, ArrayType.class, RowType.class, MultisetType.class); + Arrays.asList( + MapType.class, + ArrayType.class, + RowType.class, + MultisetType.class, + VectorType.class); /** * Validate the {@link TableSchema} and {@link CoreOptions}. diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index ce714e27078f..f08c852c9773 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -442,7 +442,7 @@ public void testVectorStoreContainsPartitionColumn() { emptyList(), options, ""))) - .hasMessage("The vector-store columns can not be part of partition keys."); + .hasMessage("The type VectorType in partition field f1 is unsupported"); } @Test @@ -471,6 +471,33 @@ public void testVectorStoreRequiresDataEvolutionEnabled() { "Data evolution config must enabled for table with vector-store file format."); } + @Test + public void testVectorTypeCanNotBeKey() { + assertThatThrownBy( + () -> + validateTableSchema( + vectorTypeSchema(emptyList(), singletonList("f1"), null))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "The type %s in primary key field %s is unsupported", "VectorType", "f1"); + + assertThatThrownBy( + () -> + validateTableSchema( + vectorTypeSchema(singletonList("f1"), emptyList(), null))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("The type %s in partition field %s is unsupported", "VectorType", "f1"); + + assertThatThrownBy( + () -> + validateTableSchema( + vectorTypeSchema( + emptyList(), emptyList(), singletonList("f1")))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "The type %s in upsert key field %s is unsupported", "VectorType", "f1"); + } + @Test void testRowTrackingWithPkTable() { Map options = new HashMap<>(); @@ -1013,4 +1040,18 @@ public void testFullCompactionDeltaCommitsWithLookupChangelogProducer() { options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "input"); assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); } + + private TableSchema vectorTypeSchema( + List partitionKeys, List primaryKeys, List upsertKeys) { + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.VECTOR(3, DataTypes.FLOAT()))); + Map options = new HashMap<>(); + options.put(BUCKET.key(), String.valueOf(-1)); + if (upsertKeys != null) { + options.put(CoreOptions.UPSERT_KEY.key(), String.join(",", upsertKeys)); + } + return new TableSchema(1, fields, 10, partitionKeys, primaryKeys, options, ""); + } }