Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@

import org.apache.paimon.arrow.ArrowUtils;
import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarVec;
import org.apache.paimon.data.columnar.RowToColumnConverter;
import org.apache.paimon.data.columnar.VecColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.heap.HeapFloatVector;
import org.apache.paimon.data.columnar.heap.HeapVectorColumnVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -96,6 +101,56 @@ public void releaseBatch() {}
}
}

@Test
public void testNullableVectorColumnFromRowToColumnConverter() {
RowType rowType = RowType.of(DataTypes.VECTOR(3, DataTypes.FLOAT()));
RowToColumnConverter rowToColumnConverter = new RowToColumnConverter(rowType);

HeapFloatVector elementVector = new HeapFloatVector(6);
HeapVectorColumnVector vectorColumn = new HeapVectorColumnVector(2, elementVector, 3);
WritableColumnVector[] vectors = new WritableColumnVector[] {vectorColumn};
rowToColumnConverter.convert(GenericRow.of((Object) null), vectors);
rowToColumnConverter.convert(
GenericRow.of(BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f})),
vectors);

try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator);
ArrowFieldWriter[] fieldWriters = ArrowUtils.createArrowFieldWriters(vsr, rowType);
VectorizedColumnBatch batch =
new VectorizedColumnBatch(new ColumnVector[] {vectorColumn});
batch.setNumRows(2);

ArrowVectorizedBatchConverter converter =
new ArrowVectorizedBatchConverter(vsr, fieldWriters);
converter.reset(
new VectorizedRecordIterator() {
@Override
public VectorizedColumnBatch batch() {
return batch;
}

@Override
public InternalRow next() {
return null;
}

@Override
public void releaseBatch() {}
});
converter.next(2);

FixedSizeListVector listVector = (FixedSizeListVector) vsr.getVector(0);
assertThat(listVector.isNull(0)).isTrue();
assertThat(listVector.getObject(0)).isNull();
@SuppressWarnings("unchecked")
List<Float> row1 = (List<Float>) listVector.getObject(1);
assertThat(row1).containsExactly(1.0f, 2.0f, 3.0f);

converter.close();
}
}

@Test
public void testVectorColumnWriteWithPickedInColumn() {
RowType rowType = RowType.of(DataTypes.VECTOR(2, DataTypes.FLOAT()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.heap.HeapArrayVector;
import org.apache.paimon.data.columnar.heap.HeapMapVector;
import org.apache.paimon.data.columnar.heap.HeapRowVector;
import org.apache.paimon.data.columnar.heap.HeapVectorColumnVector;
import org.apache.paimon.data.columnar.writable.WritableBooleanVector;
import org.apache.paimon.data.columnar.writable.WritableByteVector;
import org.apache.paimon.data.columnar.writable.WritableBytesVector;
Expand Down Expand Up @@ -284,7 +286,31 @@ public TypeConverter visit(ArrayType arrayType) {

@Override
public TypeConverter visit(VectorType vectorType) {
throw new UnsupportedOperationException();
TypeConverter elementConverter =
getConverterForType(vectorType.getElementType().notNull());
return createConverter(
vectorType.isNullable(),
(row, column, cv) -> {
HeapVectorColumnVector vectorColumn = (HeapVectorColumnVector) cv;
if (vectorColumn.getVectorSize() != vectorType.getLength()) {
throw new IllegalArgumentException(
"Vector column length mismatch: expected "
+ vectorType.getLength()
+ " but got "
+ vectorColumn.getVectorSize());
}

InternalVector values = row.getVector(column);
checkVectorLength(values, vectorType.getLength());
checkVectorElementsNonNull(values);
vectorColumn.appendVector();

WritableColumnVector vectorData =
(WritableColumnVector) vectorColumn.getColumnVector();
for (int i = 0; i < values.size(); i++) {
elementConverter.append(values, i, vectorData);
}
});
}

@Override
Expand Down Expand Up @@ -374,6 +400,25 @@ private static TypeConverter timestampConverter(boolean nullable, int precision)
}
});
}

private static void checkVectorLength(InternalVector vector, int expectedLength) {
if (vector.size() != expectedLength) {
throw new IllegalArgumentException(
"Vector length mismatch: expected "
+ expectedLength
+ " but got "
+ vector.size());
}
}

private static void checkVectorElementsNonNull(InternalVector vector) {
for (int i = 0; i < vector.size(); i++) {
if (vector.isNullAt(i)) {
throw new UnsupportedOperationException(
"Vector elements must not be null.");
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data.columnar.heap;

import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarVec;
import org.apache.paimon.data.columnar.VecColumnVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;

/** Heap column vector for VectorType. */
public class HeapVectorColumnVector extends AbstractArrayBasedVector implements VecColumnVector {

private final int vectorSize;

public HeapVectorColumnVector(int len, ColumnVector vector, int vectorSize) {
super(len, new ColumnVector[] {vector});
if (vectorSize <= 0) {
throw new IllegalArgumentException("Vector size must be positive.");
}
this.vectorSize = vectorSize;
}

public void appendVector() {
reserve(elementsAppended + 1);
long offset = (long) elementsAppended * vectorSize;
reserveChild(offset + vectorSize);
putOffsetLength(elementsAppended, offset, vectorSize);
elementsAppended++;
}

@Override
public void appendNull() {
int index = elementsAppended;
appendVector();
setNullAt(index);

ColumnVector child = children[0];
if (child instanceof WritableColumnVector) {
WritableColumnVector writableChild = (WritableColumnVector) child;
int offset = (int) offsets[index];
writableChild.setNulls(offset, vectorSize);
writableChild.addElementsAppended(vectorSize);
}
}

@Override
public InternalVector getVector(int i) {
long offset = offsets[i];
long length = lengths[i];
if (length != vectorSize) {
throw new IllegalArgumentException(
"Vector length mismatch: expected " + vectorSize + " but got " + length);
}
return ColumnarVec.DEFAULT_FACTORY.create(children[0], (int) offset, (int) length);
}

@Override
public ColumnVector getColumnVector() {
return children[0];
}

@Override
public int getVectorSize() {
return vectorSize;
}

private void reserveChild(long requiredCapacity) {
if (requiredCapacity > Integer.MAX_VALUE) {
throw new UnsupportedOperationException(
"Cannot allocate " + requiredCapacity + " vector elements");
}
ColumnVector child = children[0];
if (child instanceof WritableColumnVector) {
((WritableColumnVector) child).reserve((int) requiredCapacity);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.BinaryMap;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.io.DataInputView;
Expand All @@ -37,6 +39,8 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VectorType;
import org.apache.paimon.utils.TypeCheckUtils;
import org.apache.paimon.utils.VarLengthIntUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -164,6 +168,7 @@ public InternalRow deserialize(byte[] bytes) {
}

public Comparator<MemorySlice> createSliceComparator() {
checkComparableFields(rowType);
return new SliceComparator(rowType);
}

Expand Down Expand Up @@ -235,6 +240,18 @@ private static FieldWriter createFieldWriter(DataType fieldType) {
(InternalArray) value,
(InternalArraySerializer) arraySerializer);
break;
case VECTOR:
VectorType vectorType = (VectorType) fieldType;
InternalVectorSerializer vectorSerializer =
new InternalVectorSerializer(
vectorType.getElementType(), vectorType.getLength());
fieldWriter =
(writer, pos, value) -> {
InternalVector vector = (InternalVector) value;
checkVectorLength(vector, vectorType.getLength());
writer.writeVector(vector, vectorSerializer);
};
break;
case MULTISET:
case MAP:
Serializer<InternalMap> mapSerializer = InternalSerializers.create(fieldType);
Expand Down Expand Up @@ -329,6 +346,10 @@ private static FieldReader createFieldReader(DataType fieldType) {
case ARRAY:
fieldReader = (reader, pos) -> reader.readArray();
break;
case VECTOR:
VectorType vectorType = (VectorType) fieldType;
fieldReader = (reader, pos) -> reader.readVector(vectorType.getLength());
break;
case MULTISET:
case MAP:
fieldReader = (reader, pos) -> reader.readMap();
Expand Down Expand Up @@ -356,6 +377,27 @@ private static FieldReader createFieldReader(DataType fieldType) {
};
}

private static void checkVectorLength(InternalVector vector, int expectedLength) {
if (vector.size() != expectedLength) {
throw new IllegalArgumentException(
"Vector length mismatch: expected "
+ expectedLength
+ " but got "
+ vector.size());
}
}

private static void checkComparableFields(RowType rowType) {
for (int i = 0; i < rowType.getFieldCount(); i++) {
DataType type = rowType.getTypeAt(i);
checkArgument(
TypeCheckUtils.isComparable(type),
"Field %s with type %s is not comparable in slice comparator.",
rowType.getFields().get(i).name(),
type);
}
}

private interface FieldWriter extends Serializable {
void writeField(RowWriter writer, int pos, Object value);
}
Expand Down Expand Up @@ -467,6 +509,11 @@ private void writeArray(InternalArray value, InternalArraySerializer serializer)
writeSegments(binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}

private void writeVector(InternalVector value, InternalVectorSerializer serializer) {
BinaryVector binary = serializer.toBinaryVector(value);
writeSegments(binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}

private void writeMap(InternalMap value, InternalMapSerializer serializer) {
BinaryMap binary = serializer.toBinaryMap(value);
writeSegments(binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
Expand Down Expand Up @@ -659,6 +706,14 @@ private InternalArray readArray() {
return value;
}

private InternalVector readVector(int vectorLength) {
BinaryVector value = new BinaryVector(vectorLength);
int length = readUnsignedInt();
value.pointTo(segments, position, length);
position += length;
return value;
}

private InternalMap readMap() {
BinaryMap value = new BinaryMap();
int length = readUnsignedInt();
Expand Down
Loading
Loading