diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index 3c88e1211404..be1e34b6f2a8 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -476,6 +476,24 @@ Boolean Whether enable data evolution for row tracking table. + +
data-evolution.merge-into.file-pruning
+ true + Boolean + If true, enables the file-level pruning step for MergeInto partial column update on data-evolution tables. Set this to false when most files in the target partition are expected to be updated, so that the overhead of collecting touched file IDs outweighs the benefit of pruning untouched files. + + +
data-evolution.merge-into.source-persist
+ false + Boolean + Whether to persist source when process merge into action on data evolution table. + + +
data-evolution.nested-field.enabled
+ false + Boolean + Whether to enable sub-field-level data evolution for nested (struct) columns. When enabled, an update that only touches some sub-fields of a nested column writes an incremental file containing just those sub-fields (aligned by row id); when disabled, the whole top-level column is rewritten. Requires data-evolution.enabled=true. +
data-evolution.row-sidecar.enabled
false @@ -494,18 +512,6 @@ Double Maximum selected row ratio for reading a row-store sidecar file. The value must be in (0, 1]. The sidecar is used only when the selected row ratio is no more than this value and the selected row count is no more than data-evolution.row-sidecar.max-selected-rows. - -
data-evolution.merge-into.file-pruning
- true - Boolean - If true, enables the file-level pruning step for MergeInto partial column update on data-evolution tables. Set this to false when most files in the target partition are expected to be updated, so that the overhead of collecting touched file IDs outweighs the benefit of pruning untouched files. - - -
data-evolution.merge-into.source-persist
- false - Boolean - Whether to persist source when process merge into action on data evolution table. -
data-file.external-paths
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e3b6b01952cc..88eaa65617e8 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2354,6 +2354,18 @@ public String toString() { .defaultValue(false) .withDescription("Whether enable data evolution for row tracking table."); + public static final ConfigOption DATA_EVOLUTION_NESTED_FIELD_ENABLED = + key("data-evolution.nested-field.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable sub-field-level data evolution for nested (struct) " + + "columns. When enabled, an update that only touches some " + + "sub-fields of a nested column writes an incremental file " + + "containing just those sub-fields (aligned by row id); when " + + "disabled, the whole top-level column is rewritten. Requires " + + "data-evolution.enabled=true."); + public static final ConfigOption DATA_EVOLUTION_ROW_SIDECAR_ENABLED = key("data-evolution.row-sidecar.enabled") .booleanType() @@ -3958,6 +3970,10 @@ public boolean dataEvolutionEnabled() { return options.get(DATA_EVOLUTION_ENABLED); } + public boolean dataEvolutionNestedFieldEnabled() { + return options.get(DATA_EVOLUTION_NESTED_FIELD_ENABLED); + } + public boolean dataEvolutionRowSidecarEnabled() { return options.get(DATA_EVOLUTION_ROW_SIDECAR_ENABLED); } diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java index 8fee740fd1cd..9a429f54da28 100644 --- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -288,7 +288,9 @@ public TableSchema project(@Nullable List writeCols) { return new TableSchema( version, id, - new RowType(fields).project(writeCols).getFields(), + // writeCols may contain nested dotted paths (e.g. "nest.a") for sub-field-level + // data evolution; projectByPaths handles both plain top-level names and paths + new RowType(fields).projectByPaths(writeCols).getFields(), highestFieldId, partitionKeys, primaryKeys, diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java index e592ff71619b..87d28db02151 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java @@ -312,6 +312,165 @@ public RowType project(String... names) { return project(Arrays.asList(names)); } + /** + * Project this row type by a list of (possibly nested) dotted paths, e.g. {@code ["f0", + * "nest.a"]}. A path without a dot selects the whole top-level field (same as {@link + * #project(List)}); a dotted path selects only the addressed sub-field of a nested {@link + * RowType}, preserving field ids and nullability of every level. Schema field order is + * preserved. This is used by data evolution to reconstruct the partial nested schema of a + * column-group file from its {@code writeCols}. + */ + public RowType projectByPaths(List paths) { + return projectTypeByPaths(this, paths); + } + + private static RowType projectTypeByPaths(RowType type, List paths) { + // group paths by their immediate child name; a child appearing without a tail (or also with + // a tail) is selected as a whole field + Map> childToSubPaths = new HashMap<>(); + Set wholeChildren = new HashSet<>(); + Set fieldNames = new HashSet<>(); + for (DataField field : type.getFields()) { + fieldNames.add(field.name()); + } + for (String path : paths) { + int dot = path.indexOf('.'); + // Prefer an exact field-name match so a column whose name itself contains a dot (and + // any + // plain top-level name) is selected whole; only split into head.tail for genuine nested + // sub-field paths that do not name a field directly. This keeps backward compatibility + // with the legacy exact-name project(List). + if (dot < 0 || fieldNames.contains(path)) { + childToSubPaths.computeIfAbsent(path, k -> new ArrayList<>()); + wholeChildren.add(path); + } else { + String head = path.substring(0, dot); + String tail = path.substring(dot + 1); + childToSubPaths.computeIfAbsent(head, k -> new ArrayList<>()).add(tail); + } + } + + Set matched = new HashSet<>(); + List result = new ArrayList<>(); + for (DataField field : type.getFields()) { + List subPaths = childToSubPaths.get(field.name()); + if (subPaths == null) { + continue; + } + matched.add(field.name()); + if (wholeChildren.contains(field.name()) || subPaths.isEmpty()) { + result.add(field); + } else if (field.type() instanceof RowType) { + RowType prunedChild = + projectTypeByPaths((RowType) field.type(), subPaths) + .copy(field.type().isNullable()); + result.add(field.newType(prunedChild)); + } else { + // a dotted path addresses a sub-field, but this field is not a ROW; reject rather + // than silently selecting the whole field, so invalid dotted paths surface early + throw new IllegalArgumentException( + "Cannot project sub-field(s) " + + subPaths + + " of non-ROW field '" + + field.name() + + "' in " + + type); + } + } + if (!matched.containsAll(childToSubPaths.keySet())) { + Set unknown = new HashSet<>(childToSubPaths.keySet()); + unknown.removeAll(matched); + throw new IllegalArgumentException( + "Cannot project by paths, unknown field(s) " + unknown + " in " + type); + } + return new RowType(type.isNullable(), result); + } + + /** + * Compute the dotted paths describing this (possibly partially nested) write type relative to a + * full row type. A top-level field, or a nested field whose structure fully covers the + * corresponding field in {@code fullType}, is emitted by its name; a nested field that only + * covers some sub-fields is expanded into dotted leaf paths. This is the inverse of {@link + * #projectByPaths(List)} and is used to derive {@code writeCols}. + */ + public List leafPaths(RowType fullType) { + List result = new ArrayList<>(); + collectLeafPaths(getFields(), fullType, "", result); + return result; + } + + private static void collectLeafPaths( + List writeFields, RowType fullType, String prefix, List out) { + for (DataField writeField : writeFields) { + String path = prefix.isEmpty() ? writeField.name() : prefix + "." + writeField.name(); + // A field absent from the reference type (e.g. the _ROW_ID / _SEQUENCE_NUMBER special + // fields added by row tracking, which are not part of the table's logical row type) has + // no sub-field split: emit it whole by name, matching the legacy getFieldNames() + // output. + if (!fullType.containsField(writeField.id())) { + out.add(path); + continue; + } + DataField fullField = fullType.getField(writeField.id()); + boolean willExpand = + writeField.type() instanceof RowType + && fullField.type() instanceof RowType + && !coversFully( + (RowType) writeField.type(), (RowType) fullField.type()); + // A dotted path is only unambiguous if no name segment contains a literal '.'. A name + // with a dot is fine when emitted whole at top level (projectByPaths matches it + // exactly), + // but not when it participates in a multi-segment nested path. + if (writeField.name().indexOf('.') >= 0 && (!prefix.isEmpty() || willExpand)) { + throw new UnsupportedOperationException( + "Sub-field-level data evolution does not support a nested field whose name " + + "contains '.': " + + path); + } + if (willExpand) { + // A partial struct nested inside another partial struct (a path deeper than one + // level, e.g. nest.sub.x) cannot be composed back on read — the data-evolution read + // path only assembles one nested level. Reject it here so such a file is never + // written/committed and later breaks full-table reads. + if (!prefix.isEmpty()) { + throw new UnsupportedOperationException( + "Sub-field-level data evolution supports only one level of partial " + + "nesting; the nested sub-field '" + + path + + "' cannot be partially written. Write the whole '" + + path + + "' sub-field instead."); + } + collectLeafPaths( + ((RowType) writeField.type()).getFields(), + (RowType) fullField.type(), + path, + out); + } else { + out.add(path); + } + } + } + + /** Whether {@code part} contains every (recursively nested) field of {@code full}. */ + private static boolean coversFully(RowType part, RowType full) { + if (part.getFieldCount() != full.getFieldCount()) { + return false; + } + for (DataField fullField : full.getFields()) { + if (!part.containsField(fullField.id())) { + return false; + } + DataField partField = part.getField(fullField.id()); + if (partField.type() instanceof RowType && fullField.type() instanceof RowType) { + if (!coversFully((RowType) partField.type(), (RowType) fullField.type())) { + return false; + } + } + } + return true; + } + private Map nameToField() { Map nameToField = this.laziedNameToField; if (nameToField == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java index 53766440ff01..76e00e049eb5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java @@ -57,9 +57,18 @@ public class DataEvolutionFileReader implements RecordReader { private final int[] rowOffsets; private final int[] fieldOffsets; private final RecordReader[] readers; + @Nullable private final DataEvolutionRow.NestedField[] nested; public DataEvolutionFileReader( int[] rowOffsets, int[] fieldOffsets, RecordReader[] readers) { + this(rowOffsets, fieldOffsets, readers, null); + } + + public DataEvolutionFileReader( + int[] rowOffsets, + int[] fieldOffsets, + RecordReader[] readers, + @Nullable DataEvolutionRow.NestedField[] nested) { checkArgument(rowOffsets != null, "Row offsets must not be null"); checkArgument(fieldOffsets != null, "Field offsets must not be null"); checkArgument( @@ -70,12 +79,14 @@ public DataEvolutionFileReader( this.rowOffsets = rowOffsets; this.fieldOffsets = fieldOffsets; this.readers = readers; + this.nested = nested; } @Override @Nullable public RecordIterator readBatch() throws IOException { DataEvolutionRow row = new DataEvolutionRow(readers.length, rowOffsets, fieldOffsets); + row.setNested(nested); RecordIterator[] iterators = new RecordIterator[readers.length]; for (int i = 0; i < readers.length; i++) { RecordReader reader = readers[i]; diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java index 08c6d24d2b79..da36b9acd0bf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java @@ -35,6 +35,14 @@ public class DataEvolutionRow implements InternalRow { private final InternalRow[] rows; private final int[] rowOffsets; private final int[] fieldOffsets; + + /** + * Optional per-top-level-field plan to assemble a nested struct whose sub-fields are spread + * across several source files (sub-field-level data evolution). {@code null} (or a {@code null} + * entry) means the field is taken whole from a single source row (the common case). + */ + private NestedField[] nested; + private RowKind rowKind; public DataEvolutionRow(int rowNumber, int[] rowOffsets, int[] fieldOffsets) { @@ -43,6 +51,10 @@ public DataEvolutionRow(int rowNumber, int[] rowOffsets, int[] fieldOffsets) { this.fieldOffsets = fieldOffsets; } + public void setNested(NestedField[] nested) { + this.nested = nested; + } + public int rowNumber() { return rows.length; } @@ -59,6 +71,21 @@ public void setRow(int pos, InternalRow row) { } } + private void setRowsAllowNull(InternalRow[] newRows) { + for (int i = 0; i < newRows.length; i++) { + this.rows[i] = newRows[i]; + if (rowKind == null && newRows[i] != null) { + this.rowKind = newRows[i].getRowKind(); + } + } + if (rowKind == null) { + // a composed struct whose every source partial is null still needs a defined kind so + // getRowKind() never returns null; the kind of an assembled struct value is not + // meaningful, so default to INSERT + this.rowKind = RowKind.INSERT; + } + } + public void setRows(InternalRow[] rows) { if (rows.length != this.rows.length) { throw new IllegalArgumentException( @@ -97,10 +124,22 @@ public void setRowKind(RowKind kind) { @Override public boolean isNullAt(int pos) { + if (nested != null && nested[pos] != null) { + // a composed struct is null only when none of its source files provide it + NestedField nf = nested[pos]; + for (int k = 0; k < nf.numPartials; k++) { + InternalRow src = rows[nf.partialReader[k]]; + if (src != null && !src.isNullAt(nf.partialOffset[k])) { + return false; + } + } + return true; + } if (rowOffsets[pos] < 0) { return true; } - return chooseRow(pos).isNullAt(offsetInRow(pos)); + InternalRow row = chooseRow(pos); + return row == null || row.isNullAt(offsetInRow(pos)); } @Override @@ -185,6 +224,54 @@ public InternalMap getMap(int pos) { @Override public InternalRow getRow(int pos, int numFields) { + if (nested != null && nested[pos] != null) { + NestedField nf = nested[pos]; + InternalRow[] partials = new InternalRow[nf.numPartials]; + for (int k = 0; k < nf.numPartials; k++) { + InternalRow src = rows[nf.partialReader[k]]; + partials[k] = + (src == null || src.isNullAt(nf.partialOffset[k])) + ? null + : src.getRow(nf.partialOffset[k], nf.partialSize[k]); + } + DataEvolutionRow composed = + new DataEvolutionRow(nf.numPartials, nf.subRowOffsets, nf.subFieldOffsets); + composed.setRowsAllowNull(partials); + return composed; + } return chooseRow(pos).getRow(offsetInRow(pos), numFields); } + + /** + * Plan to assemble one nested struct from sub-fields spread across several source files. A + * "partial" is the projection of the struct read from a single source file; each output + * sub-field is sourced from one partial via {@code subRowOffsets}/{@code subFieldOffsets}. + */ + public static class NestedField { + + final int numPartials; + // per partial struct: the source reader, the struct's position in that reader's row, and + // the + // number of fields of the struct as read from that file + final int[] partialReader; + final int[] partialOffset; + final int[] partialSize; + // per output sub-field: which partial it comes from, and its offset within that partial + final int[] subRowOffsets; + final int[] subFieldOffsets; + + public NestedField( + int[] partialReader, + int[] partialOffset, + int[] partialSize, + int[] subRowOffsets, + int[] subFieldOffsets) { + this.numPartials = partialReader.length; + this.partialReader = partialReader; + this.partialOffset = partialOffset; + this.partialSize = partialSize; + this.subRowOffsets = subRowOffsets; + this.subFieldOffsets = subFieldOffsets; + } + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java index df4cf0679be0..5d4c9c0f1964 100644 --- a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java @@ -206,6 +206,97 @@ void testRowType() { .isInstanceOf(IllegalArgumentException.class); } + @Test + void testProjectByPaths() { + RowType type = + new RowType( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField( + 1, + "nest", + DataTypes.ROW( + new DataField(2, "a", DataTypes.INT()), + new DataField(3, "b", DataTypes.STRING()))))); + + // dotted path selects only the addressed sub-field, preserving ids + RowType onlyNestA = type.projectByPaths(Collections.singletonList("nest.a")); + Assertions.assertThat(onlyNestA.getFieldNames()).containsExactly("nest"); + RowType nestSub = (RowType) onlyNestA.getField("nest").type(); + Assertions.assertThat(nestSub.getFieldNames()).containsExactly("a"); + Assertions.assertThat(nestSub.getField("a").id()).isEqualTo(2); + + // a plain top-level name selects the whole field + RowType wholeNest = + (RowType) + type.projectByPaths(Collections.singletonList("nest")) + .getField("nest") + .type(); + Assertions.assertThat(wholeNest.getFieldNames()).containsExactly("a", "b"); + + // mixing a whole column and a sub-field + Assertions.assertThat(type.projectByPaths(Arrays.asList("id", "nest.b")).getFieldNames()) + .containsExactly("id", "nest"); + + // a dotted path whose head is not a ROW is rejected (not silently widened to the scalar) + assertThatThrownBy(() -> type.projectByPaths(Collections.singletonList("id.a"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("non-ROW field 'id'"); + + // an unknown path is rejected + assertThatThrownBy(() -> type.projectByPaths(Collections.singletonList("missing"))) + .isInstanceOf(IllegalArgumentException.class); + + // a column whose name itself contains a dot is matched exactly (not split) + RowType dotted = + new RowType(Collections.singletonList(new DataField(0, "a.b", DataTypes.INT()))); + Assertions.assertThat( + dotted.projectByPaths(Collections.singletonList("a.b")).getFieldNames()) + .containsExactly("a.b"); + } + + @Test + void testLeafPaths() { + RowType full = + new RowType( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField( + 1, + "nest", + DataTypes.ROW( + new DataField(2, "a", DataTypes.INT()), + new DataField( + 3, + "sub", + DataTypes.ROW( + new DataField( + 4, "x", DataTypes.INT()), + new DataField( + 5, + "y", + DataTypes.INT()))))))); + + // a full write collapses to top-level names (no dotted paths) + Assertions.assertThat(full.leafPaths(full)).containsExactly("id", "nest"); + + // one level of partial nesting: a direct sub-field of a top-level struct + Assertions.assertThat( + full.projectByPaths(Collections.singletonList("nest.a")).leafPaths(full)) + .containsExactly("nest.a"); + + // a whole sub-struct under a partial top-level struct is still one level + Assertions.assertThat( + full.projectByPaths(Collections.singletonList("nest.sub")).leafPaths(full)) + .containsExactly("nest.sub"); + + // deeper than one level (a partial sub-struct) is rejected so it can never be committed + RowType deepPartial = full.projectByPaths(Collections.singletonList("nest.sub.x")); + assertThatThrownBy(() -> deepPartial.leafPaths(full)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("one level"); + } + // -------------------------------------------------------------------------------------------- private static ThrowingConsumer baseAssertions(String sqlString, DataType otherType) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index 574143192e19..ff01d90b9b30 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -165,9 +165,10 @@ public void withWriteType(RowType writeType) { if (blobContext != null) { blobContext = blobContext.withWriteType(writeType); } - int fullCount = rowType.getFieldCount(); List fullNames = rowType.getFieldNames(); - this.writeCols = writeType.getFieldNames(); + // writeCols carries (possibly nested) dotted paths, e.g. ["f0", "nest.a"]; a plain + // top-level name means the whole column, a dotted path means only that sub-field is written + this.writeCols = writeType.leafPaths(rowType); // optimize writeCols to null in following cases: // writeType contains all columns (without _ROW_ID and _SEQUENCE_NUMBER) if (writeCols.equals(fullNames)) { @@ -279,7 +280,9 @@ private RowDataRollingFileWriter createRollingFileWriter( FileSource.COMPACT, options.asyncFileWrite(), options.statsDenseStore(), - rowType.equals(writeType) ? null : writeType.getFieldNames(), + // use the same dotted-leaf-path encoding as withWriteType so a partial nested + // writeType records its real sub-field content consistently across write paths + rowType.equals(writeType) ? null : writeType.leafPaths(rowType), rowSidecarFileFormat()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java index b5ea80b0606e..a16c19cda5ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java @@ -333,6 +333,15 @@ static EvolutionStats evolutionStats( for (int k = 0; k < fieldIdsWithStats.length; k++) { if (fieldId == fieldIdsWithStats[k]) { DataType fileType = dataFileSchemaWithStats.fields().get(k).type(); + // A sub-field-level data evolution file may store only part of a + // nested struct (e.g. nest of nest); its file type then + // does + // not equal the full target struct. We intentionally skip stats in + // that case (leaving the field as "no stats" so no file is wrongly + // pruned) rather than composing partial-struct stats across files; + // struct columns rarely carry useful min/max and data evolution + // does + // not push predicates down, so the lost benefit is negligible. if (!fileType.equalsIgnoreFieldId(targetType)) { continue loop1; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionReadPlanner.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionReadPlanner.java new file mode 100644 index 000000000000..975b2ba8fd6c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionReadPlanner.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.reader.DataEvolutionRow; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Pure (no-IO) planner for sub-field-level data evolution reads. Given the requested read row type + * and, for each column-group file ("bunch"), the row type it physically provides (its written + * columns, already wrapped with row-tracking fields), it decides for every read field whether it is + * taken whole from a single file or composed sub-field by sub-field across several files (latest + * file wins per leaf), and produces the offset maps and the per-field {@link + * DataEvolutionRow.NestedField} assembly plans. + * + *

Separating this from {@link DataEvolutionSplitRead} keeps the reader-building (IO) thin and + * lets the layout logic be unit-tested directly. Only one level of nested composition is supported; + * deeper or cross-file splits of a sub-struct throw {@link UnsupportedOperationException}. + */ +class DataEvolutionReadPlanner { + + private final RowType readRowType; + // for each bunch, the (row-tracked) row type it physically provides + private final List bunchAvailTypes; + + DataEvolutionReadPlanner(RowType readRowType, List bunchAvailTypes) { + this.readRowType = readRowType; + this.bunchAvailTypes = bunchAvailTypes; + } + + DataEvolutionReadPlan plan() { + List allReadFields = readRowType.getFields(); + int numFields = allReadFields.size(); + int numBunches = bunchAvailTypes.size(); + + // gather, per bunch, the set of leaf field ids it physically provides + List> bunchLeaves = new ArrayList<>(); + for (int i = 0; i < numBunches; i++) { + Set leaves = new HashSet<>(); + collectLeafIds(bunchAvailTypes.get(i).getFields(), leaves); + bunchLeaves.add(leaves); + } + + // decide, per read field, whether it is taken whole from one file or composed from several + // files at sub-field granularity. Files are already sorted latest-first, so the first bunch + // providing a leaf wins (latest-wins semantics, now at sub-field level). + // selection per bunch: topFieldId -> null (whole) or set of selected sub-field ids + List>> bunchSelection = new ArrayList<>(); + for (int i = 0; i < numBunches; i++) { + bunchSelection.add(new LinkedHashMap<>()); + } + + int[] rowOffsets = new int[numFields]; + int[] fieldOffsets = new int[numFields]; + Arrays.fill(rowOffsets, -1); + Arrays.fill(fieldOffsets, -1); + DataEvolutionRow.NestedField[] nested = new DataEvolutionRow.NestedField[numFields]; + boolean[] composite = new boolean[numFields]; + int[] wholeBunch = new int[numFields]; + Arrays.fill(wholeBunch, -1); + + for (int j = 0; j < numFields; j++) { + DataField rf = allReadFields.get(j); + List leaves = leafIdsOf(rf); + Map leafProvider = new HashMap<>(); + Set providers = new HashSet<>(); + for (int leaf : leaves) { + int p = providerOf(leaf, bunchLeaves); + if (p >= 0) { + leafProvider.put(leaf, p); + providers.add(p); + } + } + if (providers.isEmpty()) { + // no file provides this field; it stays null (nullability checked below) + continue; + } + // Only read a field whole from a single file when that file covers ALL of its leaves. + // If a single file provides only some leaves of a struct (the rest absent everywhere), + // we must prune to the provided sub-fields so the reader is not asked for sub-fields + // the + // file does not physically contain; the missing ones stay null via the composite plan. + boolean allLeavesCovered = leafProvider.size() == leaves.size(); + if (providers.size() == 1 && allLeavesCovered) { + int b = providers.iterator().next(); + bunchSelection.get(b).put(rf.id(), null); + wholeBunch[j] = b; + } else { + checkArgument( + rf.type() instanceof RowType, + "Field %s is split across files but is not a struct.", + rf.name()); + composite[j] = true; + for (DataField sub : ((RowType) rf.type()).getFields()) { + List subLeaves = leafIdsOf(sub); + Set subProviders = new HashSet<>(); + int coveredSubLeaves = 0; + for (int leaf : subLeaves) { + int p = leafProvider.getOrDefault(leaf, -1); + if (p >= 0) { + subProviders.add(p); + coveredSubLeaves++; + } + } + if (subProviders.size() > 1) { + throw new UnsupportedOperationException( + "Sub-field-level data evolution does not yet support splitting a " + + "nested sub-field (" + + rf.name() + + "." + + sub.name() + + ") across multiple files."); + } + if (subProviders.size() == 1) { + if (sub.type() instanceof RowType && coveredSubLeaves < subLeaves.size()) { + // the single provider holds only part of this nested sub-struct; + // reading + // it whole would request leaves it lacks, and one-level composition + // cannot prune deeper than this level yet + throw new UnsupportedOperationException( + "Sub-field-level data evolution does not yet support reading a " + + "partially-written nested sub-field (" + + rf.name() + + "." + + sub.name() + + ") deeper than one level."); + } + int b = subProviders.iterator().next(); + bunchSelection + .get(b) + .computeIfAbsent(rf.id(), k -> new LinkedHashSet<>()) + .add(sub.id()); + } + // else: sub-field absent everywhere -> stays null + } + } + } + + // materialize each bunch's partial read row type and the offset maps. + List> bunchReadFields = new ArrayList<>(); + List> bunchTopOffset = new ArrayList<>(); + List>> bunchSubOffset = new ArrayList<>(); + for (int i = 0; i < numBunches; i++) { + Map> sel = bunchSelection.get(i); + List readFields = new ArrayList<>(); + Map topOffset = new HashMap<>(); + Map> subOffset = new HashMap<>(); + for (Map.Entry> e : sel.entrySet()) { + int topId = e.getKey(); + Set subs = e.getValue(); + DataField readTop = readRowType.getField(topId); + if (subs == null) { + readFields.add(readTop); + topOffset.put(topId, readFields.size() - 1); + } else { + RowType readStruct = (RowType) readTop.type(); + List chosen = new ArrayList<>(); + Map subToIdx = new HashMap<>(); + for (DataField s : readStruct.getFields()) { + if (subs.contains(s.id())) { + subToIdx.put(s.id(), chosen.size()); + chosen.add(s); + } + } + RowType partial = new RowType(readStruct.isNullable(), chosen); + readFields.add(readTop.newType(partial)); + topOffset.put(topId, readFields.size() - 1); + subOffset.put(topId, subToIdx); + } + } + bunchReadFields.add(readFields); + bunchTopOffset.add(topOffset); + bunchSubOffset.add(subOffset); + } + + // wire output offsets (whole fields) and nested composition plans (split structs). + for (int j = 0; j < numFields; j++) { + DataField rf = allReadFields.get(j); + if (composite[j]) { + List subFields = ((RowType) rf.type()).getFields(); + int subCount = subFields.size(); + int[] subRowOffsets = new int[subCount]; + int[] subFieldOffsets = new int[subCount]; + Arrays.fill(subRowOffsets, -1); + Arrays.fill(subFieldOffsets, -1); + Map bunchToPartial = new LinkedHashMap<>(); + List partials = new ArrayList<>(); + for (int s = 0; s < subCount; s++) { + int subId = subFields.get(s).id(); + int b = findSubProvider(rf.id(), subId, bunchSubOffset); + if (b < 0) { + // no file provides this sub-field; it stays null, so it must be nullable + checkArgument( + subFields.get(s).type().isNullable(), + "Sub-field %s.%s is not null but can't find any file contains it.", + rf.name(), + subFields.get(s).name()); + continue; + } + Integer pIdx = bunchToPartial.get(b); + if (pIdx == null) { + int topOff = bunchTopOffset.get(b).get(rf.id()); + int size = bunchSubOffset.get(b).get(rf.id()).size(); + pIdx = partials.size(); + bunchToPartial.put(b, pIdx); + partials.add(new int[] {b, topOff, size}); + } + subRowOffsets[s] = pIdx; + subFieldOffsets[s] = bunchSubOffset.get(b).get(rf.id()).get(subId); + } + int p = partials.size(); + int[] pr = new int[p]; + int[] po = new int[p]; + int[] ps = new int[p]; + for (int k = 0; k < p; k++) { + pr[k] = partials.get(k)[0]; + po[k] = partials.get(k)[1]; + ps[k] = partials.get(k)[2]; + } + nested[j] = + new DataEvolutionRow.NestedField( + pr, po, ps, subRowOffsets, subFieldOffsets); + } else if (wholeBunch[j] >= 0) { + int b = wholeBunch[j]; + rowOffsets[j] = b; + fieldOffsets[j] = bunchTopOffset.get(b).get(rf.id()); + } + } + + return new DataEvolutionReadPlan(rowOffsets, fieldOffsets, nested, bunchReadFields); + } + + /** Collect (recursively) the leaf field ids of {@code fields}; only ROW types recurse. */ + private static void collectLeafIds(List fields, Collection out) { + for (DataField f : fields) { + if (f.type() instanceof RowType) { + collectLeafIds(((RowType) f.type()).getFields(), out); + } else { + out.add(f.id()); + } + } + } + + private static List leafIdsOf(DataField field) { + List result = new ArrayList<>(); + collectLeafIds(Collections.singletonList(field), result); + return result; + } + + private static int providerOf(int leafId, List> bunchLeaves) { + for (int i = 0; i < bunchLeaves.size(); i++) { + if (bunchLeaves.get(i).contains(leafId)) { + return i; + } + } + return -1; + } + + private static int findSubProvider( + int topId, int subId, List>> bunchSubOffset) { + for (int b = 0; b < bunchSubOffset.size(); b++) { + Map sm = bunchSubOffset.get(b).get(topId); + if (sm != null && sm.containsKey(subId)) { + return b; + } + } + return -1; + } + + /** + * Immutable result of {@link DataEvolutionReadPlanner#plan()}: how each read field is sourced + * (whole via {@code rowOffsets}/{@code fieldOffsets}, or composed via {@code nested}) and, per + * bunch, the (possibly partial nested) fields to physically read. + */ + static class DataEvolutionReadPlan { + + // per read field: the bunch a whole field is taken from (-1 if absent or composed) + final int[] rowOffsets; + // per read field: the field offset within that bunch's read row (-1 if absent or composed) + final int[] fieldOffsets; + // per read field: the sub-field assembly plan for a struct split across files (null if + // whole) + final DataEvolutionRow.NestedField[] nested; + // per bunch: the fields (with partial nested structs) to read from that file + final List> bunchReadFields; + + DataEvolutionReadPlan( + int[] rowOffsets, + int[] fieldOffsets, + DataEvolutionRow.NestedField[] nested, + List> bunchReadFields) { + this.rowOffsets = rowOffsets; + this.fieldOffsets = fieldOffsets; + this.nested = nested; + this.bunchReadFields = bunchReadFields; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index 31ff6cd61404..b35afefccb4c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -43,7 +43,6 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.types.DataField; @@ -61,7 +60,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -250,89 +248,75 @@ private DataEvolutionFileReader createUnionReader( } } - // Init all we need to create a compound reader + // Init all we need to create a compound reader: resolve each bunch's physically-provided + // (row-tracked) row type, then delegate the no-IO layout planning (leaf-level matching and + // nested sub-field assembly) to DataEvolutionReadPlanner; this class only builds readers. List allReadFields = readRowType.getFields(); - RecordReader[] fileRecordReaders = new RecordReader[fieldsFiles.size()]; - int[] readFieldIndex = allReadFields.stream().mapToInt(DataField::id).toArray(); - // which row the read field index belongs to - int[] rowOffsets = new int[allReadFields.size()]; - // which field index in the reading row - int[] fieldOffsets = new int[allReadFields.size()]; - Arrays.fill(rowOffsets, -1); - Arrays.fill(fieldOffsets, -1); - - for (int i = 0; i < fieldsFiles.size(); i++) { + int numBunches = fieldsFiles.size(); + RecordReader[] fileRecordReaders = new RecordReader[numBunches]; + + List bunchAvailTypes = new ArrayList<>(); + for (FieldBunch fieldBunch : fieldsFiles) { + DataFileMeta first = fieldBunch.files().get(0); + TableSchema dataSchema = + schemaFetcher.apply(first.schemaId()).project(first.writeCols()); + bunchAvailTypes.add(rowTypeWithRowTracking(dataSchema.logicalRowType())); + } + DataEvolutionReadPlanner.DataEvolutionReadPlan plan = + new DataEvolutionReadPlanner(readRowType, bunchAvailTypes).plan(); + + // Build the per-bunch readers from the planned partial read row types. + for (int i = 0; i < numBunches; i++) { + List readFields = plan.bunchReadFields.get(i); + if (readFields.isEmpty()) { + fileRecordReaders[i] = null; + continue; + } FieldBunch bunch = fieldsFiles.get(i); DataFileMeta firstFile = bunch.files().get(0); FileReadTarget readTarget = readTarget(firstFile, dataFilePathFactory, rowRanges); String formatIdentifier = readTarget.formatIdentifier; long schemaId = firstFile.schemaId(); TableSchema dataSchema = schemaFetcher.apply(schemaId).project(firstFile.writeCols()); - int[] fieldIds = - SpecialFields.rowTypeWithRowTracking(dataSchema.logicalRowType()).getFields() - .stream() - .mapToInt(DataField::id) - .toArray(); - List readFields = new ArrayList<>(); - for (int j = 0; j < readFieldIndex.length; j++) { - for (int fieldId : fieldIds) { - // Check if the read field index matches the file field - // index - if (readFieldIndex[j] == fieldId) { - // If the row offset is not set, set it to the current - // file reader - if (rowOffsets[j] == -1) { - // "i" is the reader index, and "readFields.size()" - // is the offset the that row - rowOffsets[j] = i; - fieldOffsets[j] = readFields.size(); - readFields.add(allReadFields.get(j)); - } - break; - } - } - } - - if (readFields.isEmpty()) { - fileRecordReaders[i] = null; - } else { - // create new FormatReaderMapping for read partial fields - List readFieldNames = - readFields.stream().map(DataField::name).collect(Collectors.toList()); - FormatReaderMapping formatReaderMapping = - formatReaderMappings.computeIfAbsent( - new FormatKey(schemaId, formatIdentifier, readFieldNames), - key -> - formatBuilder.build( - formatIdentifier, - schema, - dataSchema, - readFields, - false)); - RowType partialReadRowType = new RowType(readFields); - fileRecordReaders[i] = - new ForceSingleBatchReader( - createFieldBunchReader( - partition, - bunch, - dataFilePathFactory, - formatReaderMapping, - rowRanges, - partialReadRowType)); - } + RowType partialReadRowType = new RowType(readFields); + // cache key must use paths relative to the full schema so that two files reading + // different sub-fields of the same struct (e.g. nest.a vs nest.b) do not collide + RowType fullRef = + rowTypeWithRowTracking(schemaFetcher.apply(schemaId).logicalRowType()); + List readFieldNames = partialReadRowType.leafPaths(fullRef); + FormatReaderMapping formatReaderMapping = + formatReaderMappings.computeIfAbsent( + new FormatKey(schemaId, formatIdentifier, readFieldNames), + key -> + formatBuilder.build( + formatIdentifier, + schema, + dataSchema, + readFields, + false)); + fileRecordReaders[i] = + new ForceSingleBatchReader( + createFieldBunchReader( + partition, + bunch, + dataFilePathFactory, + formatReaderMapping, + rowRanges, + partialReadRowType)); } - for (int i = 0; i < rowOffsets.length; i++) { - if (rowOffsets[i] == -1) { + for (int j = 0; j < allReadFields.size(); j++) { + if (plan.rowOffsets[j] == -1 && plan.nested[j] == null) { checkArgument( - allReadFields.get(i).type().isNullable(), + allReadFields.get(j).type().isNullable(), format( "Field %s is not null but can't find any file contains it.", - allReadFields.get(i))); + allReadFields.get(j))); } } - return new DataEvolutionFileReader(rowOffsets, fieldOffsets, fileRecordReaders); + return new DataEvolutionFileReader( + plan.rowOffsets, plan.fieldOffsets, fileRecordReaders, plan.nested); } private RecordReader createFieldBunchReader( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java index b2f8740f5269..30a3999b3e6c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowIdColumnConflictChecker.java @@ -22,6 +22,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RangeHelper; @@ -50,7 +51,7 @@ public class RowIdColumnConflictChecker { private final SchemaManager schemaManager; private final List writeRanges; - private final Map> fieldIdByNameCache = new HashMap<>(); + private final Map rowTypeCache = new HashMap<>(); private RowIdColumnConflictChecker(SchemaManager schemaManager, List deltaFiles) { this.schemaManager = schemaManager; @@ -96,18 +97,13 @@ private List buildWriteRanges(List deltaFiles) { private void addWriteFieldIds(Set fieldIds, DataFileMeta file) { List writeCols = file.writeCols(); if (writeCols == null) { - fieldIds.addAll( - fieldIdByNameCache - .computeIfAbsent(file.schemaId(), this::fieldIdByName) - .values()); + // full-schema write touches every leaf field + collectLeafIds(rowType(file.schemaId()).getFields(), fieldIds); return; } for (String writeCol : writeCols) { - Integer fieldId = fieldId(file, writeCol); - if (fieldId != null) { - fieldIds.add(fieldId); - } + fieldIds.addAll(leafFieldIds(file.schemaId(), writeCol)); } } @@ -185,37 +181,54 @@ private boolean containsAnyWriteField(Set fieldIds, DataFileMeta file) } for (String writeCol : writeCols) { - Integer fieldId = fieldId(file, writeCol); - if (fieldId != null && fieldIds.contains(fieldId)) { - return true; + for (Integer fieldId : leafFieldIds(file.schemaId(), writeCol)) { + if (fieldIds.contains(fieldId)) { + return true; + } } } return false; } - private Integer fieldId(DataFileMeta file, String writeCol) { - Integer fieldId = - fieldIdByNameCache - .computeIfAbsent(file.schemaId(), this::fieldIdByName) - .get(writeCol); - if (fieldId == null) { - if (SpecialFields.isSystemField(writeCol)) { - return null; - } + /** + * Resolve a (possibly nested, dotted) write column such as {@code "nest.a"} to the set of leaf + * field ids it covers. A whole top-level struct column (e.g. {@code "nest"}) expands to all of + * its leaf ids, so a whole-struct write and a sub-field write of the same struct still + * conflict. + */ + private List leafFieldIds(long schemaId, String writeCol) { + if (SpecialFields.isSystemField(writeCol)) { + return Collections.emptyList(); + } + // projectByPaths handles both plain top-level names and dotted nested paths, and throws if + // the path does not exist in the schema + RowType projected; + try { + projected = rowType(schemaId).projectByPaths(Collections.singletonList(writeCol)); + } catch (IllegalArgumentException e) { throw new RuntimeException( String.format( - "Cannot find write column '%s' in schema %s.", - writeCol, file.schemaId())); + "Cannot find write column '%s' in schema %s.", writeCol, schemaId), + e); } - return fieldId; + List ids = new ArrayList<>(); + collectLeafIds(projected.getFields(), ids); + return ids; } - private Map fieldIdByName(long schemaId) { - Map fieldIdByName = new HashMap<>(); - for (DataField field : schemaManager.schema(schemaId).logicalRowType().getFields()) { - fieldIdByName.put(field.name(), field.id()); + private static void collectLeafIds(List fields, java.util.Collection out) { + for (DataField field : fields) { + if (field.type() instanceof RowType) { + collectLeafIds(((RowType) field.type()).getFields(), out); + } else { + out.add(field.id()); + } } - return fieldIdByName; + } + + private RowType rowType(long schemaId) { + return rowTypeCache.computeIfAbsent( + schemaId, id -> schemaManager.schema(id).logicalRowType()); } /** Range and field id Set. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadPlannerTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadPlannerTest.java new file mode 100644 index 000000000000..2be048b8cb4d --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadPlannerTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.operation.DataEvolutionReadPlanner.DataEvolutionReadPlan; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link DataEvolutionReadPlanner} (pure, no-IO layout planning). */ +class DataEvolutionReadPlannerTest { + + // read type: id INT, nest ROW + private static final RowType READ_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField( + 1, + "nest", + DataTypes.ROW( + new DataField(2, "a", DataTypes.INT()), + new DataField(3, "b", DataTypes.STRING()))))); + + private static RowType nest(DataField... subFields) { + return new RowType( + Collections.singletonList(new DataField(1, "nest", DataTypes.ROW(subFields)))); + } + + @Test + void testStructSplitAcrossFilesIsComposed() { + // bunch0 (latest) provides nest.a; bunch1 provides id + nest.b + RowType avail0 = nest(new DataField(2, "a", DataTypes.INT())); + RowType avail1 = + new RowType( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField( + 1, + "nest", + DataTypes.ROW(new DataField(3, "b", DataTypes.STRING()))))); + + DataEvolutionReadPlan plan = + new DataEvolutionReadPlanner(READ_TYPE, Arrays.asList(avail0, avail1)).plan(); + + // id is taken whole from bunch1 + assertThat(plan.nested[0]).isNull(); + assertThat(plan.rowOffsets[0]).isEqualTo(1); + // nest is composed across files (a from bunch0, b from bunch1) + assertThat(plan.rowOffsets[1]).isEqualTo(-1); + assertThat(plan.nested[1]).isNotNull(); + // each bunch only physically reads what it provides + assertThat(plan.bunchReadFields.get(0)).hasSize(1); // nest + assertThat(plan.bunchReadFields.get(1)).hasSize(2); // id, nest + } + + @Test + void testStructWholeFromSingleFile() { + // bunch0 provides the whole nest; bunch1 provides id + RowType avail0 = + nest( + new DataField(2, "a", DataTypes.INT()), + new DataField(3, "b", DataTypes.STRING())); + RowType avail1 = + new RowType(Collections.singletonList(new DataField(0, "id", DataTypes.INT()))); + + DataEvolutionReadPlan plan = + new DataEvolutionReadPlanner(READ_TYPE, Arrays.asList(avail0, avail1)).plan(); + + // nest is taken whole from bunch0, not composed + assertThat(plan.nested[1]).isNull(); + assertThat(plan.rowOffsets[1]).isEqualTo(0); + assertThat(plan.rowOffsets[0]).isEqualTo(1); + } + + @Test + void testSubFieldAbsentEverywhereStaysNullWhenNullable() { + // only nest.a is provided anywhere; nest.b (nullable) is absent + RowType avail0 = nest(new DataField(2, "a", DataTypes.INT())); + RowType avail1 = + new RowType(Collections.singletonList(new DataField(0, "id", DataTypes.INT()))); + + DataEvolutionReadPlan plan = + new DataEvolutionReadPlanner(READ_TYPE, Arrays.asList(avail0, avail1)).plan(); + + // nest still composed (only a present), no exception since b is nullable + assertThat(plan.nested[1]).isNotNull(); + assertThat(plan.bunchReadFields.get(0)).hasSize(1); + } + + @Test + void testDeeperThanOneLevelSplitThrows() { + // read type: nest ROW>; x and y provided by different files + RowType readType = + new RowType( + Collections.singletonList( + new DataField( + 1, + "nest", + DataTypes.ROW( + new DataField( + 4, + "sub", + DataTypes.ROW( + new DataField( + 5, "x", DataTypes.INT()), + new DataField( + 6, + "y", + DataTypes.INT()))))))); + RowType avail0 = + new RowType( + Collections.singletonList( + new DataField( + 1, + "nest", + DataTypes.ROW( + new DataField( + 4, + "sub", + DataTypes.ROW( + new DataField( + 5, + "x", + DataTypes.INT()))))))); + RowType avail1 = + new RowType( + Collections.singletonList( + new DataField( + 1, + "nest", + DataTypes.ROW( + new DataField( + 4, + "sub", + DataTypes.ROW( + new DataField( + 6, + "y", + DataTypes.INT()))))))); + + assertThatThrownBy( + () -> + new DataEvolutionReadPlanner( + readType, Arrays.asList(avail0, avail1)) + .plan()) + .isInstanceOf(UnsupportedOperationException.class); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java index 8c45bfbb33ad..76ef065b6268 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/RowIdColumnConflictCheckerTest.java @@ -114,6 +114,43 @@ void testFailsOnUnknownNonSystemWriteColumn() { .hasMessageContaining("Cannot find write column 'missing'"); } + @Test + void testSubFieldDisjointLeavesDoNotConflict() { + // schema 2: id INT, nest ROW + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 2L, Arrays.asList("nest.a"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 2L, Arrays.asList("nest.b")))) + .isFalse(); + } + + @Test + void testSubFieldSameLeafConflicts() { + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 2L, Arrays.asList("nest.a"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 2L, Arrays.asList("nest.a")))) + .isTrue(); + } + + @Test + void testWholeStructConflictsWithSubField() { + // a whole-struct write expands to all of its leaves, so it conflicts with a sub-field write + RowIdColumnConflictChecker checker = + checker(file("current", 0L, 10L, 2L, Arrays.asList("nest"))); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 2L, Arrays.asList("nest.a")))) + .isTrue(); + } + + @Test + void testFullSchemaWriteConflictsWithSubField() { + RowIdColumnConflictChecker checker = checker(file("current", 0L, 10L, 2L, null)); + + assertThat(checker.conflictsWith(file("historical", 0L, 10L, 2L, Arrays.asList("nest.a")))) + .isTrue(); + } + private RowIdColumnConflictChecker checker(DataFileMeta... files) { return RowIdColumnConflictChecker.fromDataFiles( createSchemaManager(), Arrays.asList(files)); @@ -170,6 +207,23 @@ private SchemaManager createSchemaManager() { Collections.singletonList("id"), Collections.emptyMap(), ""))); + schemas.put( + 2L, + org.apache.paimon.schema.TableSchema.create( + 2L, + new Schema( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField( + 1, + "nest", + DataTypes.ROW( + new DataField(2, "a", DataTypes.INT()), + new DataField(3, "b", DataTypes.INT())))), + Collections.emptyList(), + Collections.singletonList("id"), + Collections.emptyMap(), + ""))); return new TestingSchemaManager( new Path("/tmp/row-id-column-conflict-checker-test"), schemas); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/NestedDataEvolutionTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/NestedDataEvolutionTableTest.java new file mode 100644 index 000000000000..e507343f38b6 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/NestedDataEvolutionTableTest.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.DataEvolutionFileReader; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.EndOfScanException; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for data-evolution + row-tracking with nested columns (ROW / ARRAY / MAP). + * + *

These tests verify that a nested column is handled as a single, indivisible top-level field by + * data evolution: it can live in its own column-group file and be merged back by field id, while + * {@code _ROW_ID} / {@code _SEQUENCE_NUMBER} stay aligned by {@code firstRowId + position}. They + * also pin down the limitation that sub-fields of a nested ROW cannot be split into a separate + * column group (the minimum granularity is a top-level column). + */ +public class NestedDataEvolutionTableTest extends DataEvolutionTestBase { + + // f0(0) INT, f1(1) STRING, nest(2) ROW, arr(3) ARRAY, mp(4) + // MAP + @Override + protected Schema schemaDefault() { + Schema.Builder b = Schema.newBuilder(); + b.column("f0", DataTypes.INT()); + b.column("f1", DataTypes.STRING()); + b.column("nest", DataTypes.ROW(DataTypes.INT(), DataTypes.STRING())); + b.column("arr", DataTypes.ARRAY(DataTypes.INT())); + b.column("mp", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())); + b.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + b.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + return b.build(); + } + + private static GenericRow nestOf(int a, String bStr) { + return GenericRow.of(a, BinaryString.fromString(bStr)); + } + + private static GenericArray arrOf(int... values) { + return new GenericArray(values); + } + + private static GenericMap mapOf(String k, int v) { + Map m = new HashMap<>(); + m.put(BinaryString.fromString(k), v); + return new GenericMap(m); + } + + /** G1 + G2: nested column as its own column group, merged by field id, row-id aligned. */ + @Test + public void testNestedColumnGroupMerge() throws Exception { + createTableDefault(); + Schema schema = schemaDefault(); + int n = 50; + + RowType cgA = schema.rowType().project(Arrays.asList("f0", "f1")); + RowType cgB = schema.rowType().project(Collections.singletonList("nest")); + RowType cgC = schema.rowType().project(Arrays.asList("arr", "mp")); + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + + // column group A: {f0, f1} + try (BatchTableWrite w = builder.newWrite().withWriteType(cgA)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(i, BinaryString.fromString("a" + i))); + } + BatchTableCommit commit = builder.newCommit(); + commit.commit(w.prepareCommit()); + } + + // column group B: {nest} + try (BatchTableWrite w = builder.newWrite().withWriteType(cgB)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(nestOf(i, "n" + i))); + } + BatchTableCommit commit = builder.newCommit(); + List commitables = w.prepareCommit(); + setFirstRowId(commitables, 0L); + commit.commit(commitables); + } + + // column group C: {arr, mp} + try (BatchTableWrite w = builder.newWrite().withWriteType(cgC)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(arrOf(i, i + 1), mapOf("k" + i, i))); + } + BatchTableCommit commit = builder.newCommit(); + List commitables = w.prepareCommit(); + setFirstRowId(commitables, 0L); + commit.commit(commitables); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + assertThat(reader).isInstanceOf(DataEvolutionFileReader.class); + + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + assertThat(r.getInt(0)).isEqualTo(i); + assertThat(r.getString(1).toString()).isEqualTo("a" + i); + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i); + assertThat(nest.getString(1).toString()).isEqualTo("n" + i); + assertThat(r.getArray(3).getInt(0)).isEqualTo(i); + assertThat(r.getArray(3).getInt(1)).isEqualTo(i + 1); + assertThat(r.getMap(4).keyArray().getString(0).toString()).isEqualTo("k" + i); + assertThat(r.getMap(4).valueArray().getInt(0)).isEqualTo(i); + }); + assertThat(idx.get()).isEqualTo(n); + + // _ROW_ID is contiguous 0..n-1 across the merged column groups. + List rowIds = readRowIds(); + assertThat(rowIds).hasSize(n); + for (int i = 0; i < n; i++) { + assertThat(rowIds.get(i)).isEqualTo((long) i); + } + } + + /** + * G3: a later snapshot rewrites only the nested column group; other columns keep old values. + */ + @Test + public void testNestedColumnLateOverwrite() throws Exception { + createTableDefault(); + Schema schema = schemaDefault(); + int n = 20; + + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + // full write + try (BatchTableWrite w = builder.newWrite().withWriteType(schema.rowType())) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, + BinaryString.fromString("a" + i), + nestOf(i, "old" + i), + arrOf(i), + mapOf("k" + i, i))); + } + BatchTableCommit commit = builder.newCommit(); + commit.commit(w.prepareCommit()); + } + + // later snapshot: overwrite only the nest column group, aligned to the same row-id range + RowType cgNest = schema.rowType().project(Collections.singletonList("nest")); + try (BatchTableWrite w = builder.newWrite().withWriteType(cgNest)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(nestOf(i * 10, "new" + i))); + } + BatchTableCommit commit = builder.newCommit(); + List commitables = w.prepareCommit(); + setFirstRowId(commitables, 0L); + commit.commit(commitables); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + // untouched columns keep old values + assertThat(r.getInt(0)).isEqualTo(i); + assertThat(r.getString(1).toString()).isEqualTo("a" + i); + assertThat(r.getArray(3).getInt(0)).isEqualTo(i); + // nest column reflects the new values + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i * 10); + assertThat(nest.getString(1).toString()).isEqualTo("new" + i); + }); + assertThat(idx.get()).isEqualTo(n); + + // row ids unchanged + List rowIds = readRowIds(); + for (int i = 0; i < n; i++) { + assertThat(rowIds.get(i)).isEqualTo((long) i); + } + } + + /** G4: projection covering special fields and nested columns. */ + @Test + public void testProjectionWithNested() throws Exception { + createTableDefault(); + Schema schema = schemaDefault(); + int n = 10; + + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + try (BatchTableWrite w = builder.newWrite().withWriteType(schema.rowType())) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, + BinaryString.fromString("a" + i), + nestOf(i, "n" + i), + arrOf(i), + mapOf("k" + i, i))); + } + BatchTableCommit commit = builder.newCommit(); + commit.commit(w.prepareCommit()); + } + + // project only the nested column (keep the real "nest" field id via project) + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder + .withReadType( + getTableDefault() + .rowType() + .project(Collections.singletonList("nest"))) + .newRead() + .createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + InternalRow nest = r.getRow(0, 2); + assertThat(nest.getInt(0)).isEqualTo(i); + assertThat(nest.getString(1).toString()).isEqualTo("n" + i); + }); + assertThat(idx.get()).isEqualTo(n); + + // project only _ROW_ID + assertThat(readRowIds()).hasSize(n); + } + + /** + * G5 (limitation): a sub-field of a nested ROW cannot be addressed as a top-level write column, + * because data evolution splits at top-level column granularity. {@code RowType.project} + * matches only top-level field names, so a nested sub-field name has no match. + */ + @Test + public void testNestedSubFieldCannotBeSplit() { + RowType rowType = schemaDefault().rowType(); + // "a" / "b" are sub-fields inside "nest"; they are not top-level columns. + assertThatThrownBy(() -> rowType.project(Collections.singletonList("a"))) + .isInstanceOf(IndexOutOfBoundsException.class); + // a nested ROW can only be projected as a whole top-level column. + assertThat(rowType.project(Collections.singletonList("nest")).getFieldNames()) + .containsExactly("nest"); + } + + /** + * G6: compacting column-group files (one of which is the nested column) merges them into a + * single file; data values and row ids survive the {@code DataEvolutionRowIdReassigner}. + */ + @Test + public void testCompactionWithNested() throws Exception { + createTableDefault(); + Schema schema = schemaDefault(); + int n = 30; + + RowType cgA = schema.rowType().project(Arrays.asList("f0", "f1", "arr", "mp")); + RowType cgB = schema.rowType().project(Collections.singletonList("nest")); + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + + try (BatchTableWrite w = builder.newWrite().withWriteType(cgA)) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, BinaryString.fromString("a" + i), arrOf(i), mapOf("k" + i, i))); + } + builder.newCommit().commit(w.prepareCommit()); + } + try (BatchTableWrite w = builder.newWrite().withWriteType(cgB)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(nestOf(i, "n" + i))); + } + List commitables = w.prepareCommit(); + setFirstRowId(commitables, 0L); + builder.newCommit().commit(commitables); + } + + // run data-evolution compaction via the coordinator (merges the two column groups) + FileStoreTable table = getTableDefault(); + DataEvolutionCompactCoordinator coordinator = + new DataEvolutionCompactCoordinator(table, false, false); + List commitMessages = new ArrayList<>(); + List tasks; + try { + while (!(tasks = coordinator.plan()).isEmpty()) { + for (DataEvolutionCompactTask task : tasks) { + commitMessages.add(task.doCompact(table, "nested-compact")); + } + } + } catch (EndOfScanException ignore) { + } + if (!commitMessages.isEmpty()) { + table.newBatchWriteBuilder().newCommit().commit(commitMessages); + } + + // data and row ids must be unchanged after compaction + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + assertThat(r.getInt(0)).isEqualTo(i); + assertThat(r.getString(1).toString()).isEqualTo("a" + i); + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i); + assertThat(nest.getString(1).toString()).isEqualTo("n" + i); + assertThat(r.getArray(3).getInt(0)).isEqualTo(i); + }); + assertThat(idx.get()).isEqualTo(n); + + List rowIds = readRowIds(); + assertThat(rowIds).hasSize(n); + for (int i = 0; i < n; i++) { + assertThat(rowIds.get(i)).isEqualTo((long) i); + } + } + + private List readRowIds() throws Exception { + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder + .withReadType(RowType.of(SpecialFields.ROW_ID)) + .newRead() + .createReader(readBuilder.newScan().plan()); + List rowIds = new ArrayList<>(); + reader.forEachRemaining(r -> rowIds.add(r.getLong(0))); + return rowIds; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/NestedSubfieldDataEvolutionTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/NestedSubfieldDataEvolutionTableTest.java new file mode 100644 index 000000000000..0e33b1c46e41 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/NestedSubfieldDataEvolutionTableTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.source.EndOfScanException; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for sub-field-level data evolution + row-tracking: updating a single sub-field of a + * nested ROW column by writing an incremental file that only contains that sub-field, aligned by + * row-id, and reading the struct back by assembling sub-fields from several files. + */ +public class NestedSubfieldDataEvolutionTableTest extends DataEvolutionTestBase { + + // f0(0) INT, f1(1) STRING, nest(2) ROW, arr(3) ARRAY, mp(4) + // MAP + @Override + protected Schema schemaDefault() { + Schema.Builder b = Schema.newBuilder(); + b.column("f0", DataTypes.INT()); + b.column("f1", DataTypes.STRING()); + b.column( + "nest", + DataTypes.ROW( + DataTypes.FIELD(0, "a", DataTypes.INT()), + DataTypes.FIELD(1, "b", DataTypes.STRING()))); + b.column("arr", DataTypes.ARRAY(DataTypes.INT())); + b.column("mp", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())); + b.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + b.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + return b.build(); + } + + private static GenericArray arrOf(int v) { + return new GenericArray(new int[] {v}); + } + + private static GenericMap mapOf(String k, int v) { + Map m = new HashMap<>(); + m.put(BinaryString.fromString(k), v); + return new GenericMap(m); + } + + private void commit(BatchWriteBuilder builder, List messages) throws Exception { + try (BatchTableCommit commit = builder.newCommit()) { + commit.commit(messages); + } + } + + /** + * Core: write sub-field {@code nest.a} and {@code nest.b} into two separate files (plus the + * rest of the columns in a third), then read the full struct assembled from all three. + */ + @Test + public void testSubFieldGroupsAssembled() throws Exception { + createTableDefault(); + RowType full = getTableDefault().rowType(); + int n = 40; + + RowType cgRest = full.projectByPaths(Arrays.asList("f0", "f1", "arr", "mp")); + RowType cgA = full.projectByPaths(Collections.singletonList("nest.a")); + RowType cgB = full.projectByPaths(Collections.singletonList("nest.b")); + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + + try (BatchTableWrite w = builder.newWrite().withWriteType(cgRest)) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, BinaryString.fromString("a" + i), arrOf(i), mapOf("k" + i, i))); + } + commit(builder, w.prepareCommit()); + } + try (BatchTableWrite w = builder.newWrite().withWriteType(cgA)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(GenericRow.of(i * 2))); + } + List messages = w.prepareCommit(); + // a file that only writes nest.a must record a nested dotted path + assertThat(((CommitMessageImpl) messages.get(0)).newFilesIncrement().newFiles()) + .allSatisfy(f -> assertThat(f.writeCols()).containsExactly("nest.a")); + setFirstRowId(messages, 0L); + commit(builder, messages); + } + try (BatchTableWrite w = builder.newWrite().withWriteType(cgB)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(GenericRow.of(BinaryString.fromString("b" + i)))); + } + List messages = w.prepareCommit(); + setFirstRowId(messages, 0L); + commit(builder, messages); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + assertThat(r.getInt(0)).isEqualTo(i); + assertThat(r.getString(1).toString()).isEqualTo("a" + i); + // nest assembled: a from file A, b from file B + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i * 2); + assertThat(nest.getString(1).toString()).isEqualTo("b" + i); + assertThat(r.getArray(3).getInt(0)).isEqualTo(i); + assertThat(r.getMap(4).valueArray().getInt(0)).isEqualTo(i); + }); + assertThat(idx.get()).isEqualTo(n); + + assertThat(readRowIds()).containsExactlyElementsOf(rangeLongs(n)); + } + + /** + * A later snapshot updates only {@code nest.a} via an incremental sub-field file; {@code + * nest.b} and all other columns keep their original values, row-ids unchanged. + */ + @Test + public void testSubFieldLateOverwrite() throws Exception { + createTableDefault(); + RowType full = getTableDefault().rowType(); + int n = 25; + + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + // full write: nest = (i, "old"+i) + try (BatchTableWrite w = builder.newWrite().withWriteType(full)) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, + BinaryString.fromString("a" + i), + GenericRow.of(i, BinaryString.fromString("old" + i)), + arrOf(i), + mapOf("k" + i, i))); + } + commit(builder, w.prepareCommit()); + } + + // incremental: overwrite only nest.a + RowType cgA = full.projectByPaths(Collections.singletonList("nest.a")); + try (BatchTableWrite w = builder.newWrite().withWriteType(cgA)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(GenericRow.of(i + 1000))); + } + List messages = w.prepareCommit(); + setFirstRowId(messages, 0L); + commit(builder, messages); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + assertThat(r.getInt(0)).isEqualTo(i); + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i + 1000); // updated sub-field + assertThat(nest.getString(1).toString()).isEqualTo("old" + i); // kept + }); + assertThat(idx.get()).isEqualTo(n); + assertThat(readRowIds()).containsExactlyElementsOf(rangeLongs(n)); + } + + /** Compaction merges sub-field files into one full file; values and row-ids are preserved. */ + @Test + public void testCompactionMergesSubFields() throws Exception { + createTableDefault(); + RowType full = getTableDefault().rowType(); + int n = 20; + + RowType cgRest = full.projectByPaths(Arrays.asList("f0", "f1", "arr", "mp")); + RowType cgA = full.projectByPaths(Collections.singletonList("nest.a")); + RowType cgB = full.projectByPaths(Collections.singletonList("nest.b")); + BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder(); + + try (BatchTableWrite w = builder.newWrite().withWriteType(cgRest)) { + for (int i = 0; i < n; i++) { + w.write( + GenericRow.of( + i, BinaryString.fromString("a" + i), arrOf(i), mapOf("k" + i, i))); + } + commit(builder, w.prepareCommit()); + } + try (BatchTableWrite w = builder.newWrite().withWriteType(cgA)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(GenericRow.of(i * 2))); + } + List messages = w.prepareCommit(); + setFirstRowId(messages, 0L); + commit(builder, messages); + } + try (BatchTableWrite w = builder.newWrite().withWriteType(cgB)) { + for (int i = 0; i < n; i++) { + w.write(GenericRow.of(GenericRow.of(BinaryString.fromString("b" + i)))); + } + List messages = w.prepareCommit(); + setFirstRowId(messages, 0L); + commit(builder, messages); + } + + FileStoreTable table = getTableDefault(); + DataEvolutionCompactCoordinator coordinator = + new DataEvolutionCompactCoordinator(table, false, false); + List compactMessages = new ArrayList<>(); + List tasks; + try { + while (!(tasks = coordinator.plan()).isEmpty()) { + for (DataEvolutionCompactTask task : tasks) { + compactMessages.add(task.doCompact(table, "subfield-compact")); + } + } + } catch (EndOfScanException ignore) { + } + if (!compactMessages.isEmpty()) { + commit(table.newBatchWriteBuilder(), compactMessages); + } + + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger idx = new AtomicInteger(0); + reader.forEachRemaining( + r -> { + int i = idx.getAndIncrement(); + assertThat(r.getInt(0)).isEqualTo(i); + InternalRow nest = r.getRow(2, 2); + assertThat(nest.getInt(0)).isEqualTo(i * 2); + assertThat(nest.getString(1).toString()).isEqualTo("b" + i); + }); + assertThat(idx.get()).isEqualTo(n); + assertThat(readRowIds()).containsExactlyElementsOf(rangeLongs(n)); + } + + private List rangeLongs(int n) { + List out = new ArrayList<>(); + for (long i = 0; i < n; i++) { + out.add(i); + } + return out; + } + + private List readRowIds() throws Exception { + ReadBuilder readBuilder = getTableDefault().newReadBuilder(); + RecordReader reader = + readBuilder + .withReadType(RowType.of(SpecialFields.ROW_ID)) + .newRead() + .createReader(readBuilder.newScan().plan()); + List rowIds = new ArrayList<>(); + reader.forEachRemaining(r -> rowIds.add(r.getLong(0))); + return rowIds; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java index 40c89113803c..dca1cea0eaed 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java @@ -66,8 +66,10 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -129,6 +131,10 @@ public class DataEvolutionMergeIntoAction extends TableActionBase { // the snapshot id this action based on private long baseSnapshotId; + // columns written by this merge, as (possibly nested) dotted paths, e.g. ["id", "nest.a"]; + // derived in buildSource() and consumed by writePartialColumns(). + private List writePaths; + public DataEvolutionMergeIntoAction( String databaseName, String tableName, Map catalogConfig) { super(databaseName, tableName, catalogConfig); @@ -250,32 +256,15 @@ public Tuple2, RowType> buildSource() { handleSqls(); // assign row id for each source row + boolean updateAll = matchedUpdateSet.equals("*"); List project; - if (matchedUpdateSet.equals("*")) { + if (updateAll) { // if sourceName is qualified like 'default.S', we should build a project like S.* project = Collections.singletonList(sourceTableName() + ".*"); } else { - // validate upsert changes - Map changes = parseCommaSeparatedKeyValues(matchedUpdateSet); - for (String targetField : changes.keySet()) { - if (!targetFieldNames.contains(extractFieldName(targetField))) { - throw new RuntimeException( - String.format( - "Invalid column reference '%s' of table '%s' at matched-upsert action.", - targetField, identifier.getFullName())); - } - } - - // rename source table's selected columns according to SET statement - project = - changes.entrySet().stream() - .map( - entry -> - String.format( - "%s AS `%s`", - entry.getValue(), - extractFieldName(entry.getKey()))) - .collect(Collectors.toList()); + // validate upsert changes and build the projection (top-level columns and, for + // sub-field-level data evolution, partial nested structs via dotted paths) + project = buildExplicitProject(); } String query; @@ -316,13 +305,169 @@ public Tuple2, RowType> buildSource() { Table source = batchTEnv.sqlQuery(query); checkSchema(source); - RowType sourceType = - SpecialFields.rowTypeWithRowId(table.rowType()) - .project(source.getResolvedSchema().getColumnNames()); + + RowType sourceType; + if (updateAll) { + List columnNames = source.getResolvedSchema().getColumnNames(); + sourceType = SpecialFields.rowTypeWithRowId(table.rowType()).project(columnNames); + writePaths = + columnNames.stream() + .filter(name -> !SpecialFields.ROW_ID.name().equals(name)) + .collect(Collectors.toList()); + } else { + // build the source type manually so _ROW_ID is first and the column order matches the + // SQL projection order; for nested columns the field is the partial (pruned) struct. + RowType pruned = table.rowType().projectByPaths(writePaths); + List srcFields = new ArrayList<>(); + srcFields.add(SpecialFields.ROW_ID); + for (String topCol : explicitTopColumnOrder(writePaths)) { + srcFields.add(pruned.getField(table.rowType().getField(topCol).id())); + } + sourceType = new RowType(srcFields); + } return Tuple2.of(toDataStream(source), sourceType); } + /** + * Validate the SET targets and build the SQL projection list. A target may address a top-level + * column ({@code col} / {@code T.col}) or, for sub-field-level data evolution, a nested + * sub-field ({@code nest.a} / {@code T.nest.a}). A partially-updated struct column is rebuilt + * as a partial {@code CAST(ROW(...) AS ROW<...>)} so only the touched sub-fields are written. + * Also sets {@link #writePaths}. + */ + private List buildExplicitProject() { + Map changes = parseCommaSeparatedKeyValues(matchedUpdateSet); + + // group by top-level column, preserving first-seen order + Map wholeCols = new LinkedHashMap<>(); + Map> nestedCols = new LinkedHashMap<>(); + List order = new ArrayList<>(); + + for (Map.Entry entry : changes.entrySet()) { + List path = parseTargetPath(entry.getKey()); + String topCol = path.get(0); + if (!targetFieldNames.contains(topCol)) { + throw new RuntimeException( + String.format( + "Invalid column reference '%s' of table '%s' at matched-upsert action.", + entry.getKey(), identifier.getFullName())); + } + if (!order.contains(topCol)) { + order.add(topCol); + } + if (path.size() == 1) { + // whole top-level column + if (nestedCols.containsKey(topCol) || wholeCols.containsKey(topCol)) { + throw new RuntimeException( + "Conflicting updates for column '" + topCol + "' in SET clause."); + } + wholeCols.put(topCol, entry.getValue()); + } else { + // nested sub-field update + if (!coreOptions.dataEvolutionNestedFieldEnabled()) { + throw new UnsupportedOperationException( + "Updating a nested sub-field ('" + + entry.getKey() + + "') requires '" + + CoreOptions.DATA_EVOLUTION_NESTED_FIELD_ENABLED.key() + + "=true'."); + } + if (path.size() > 2) { + throw new UnsupportedOperationException( + "Sub-field-level data evolution only supports one level of nesting, " + + "but got '" + + entry.getKey() + + "'."); + } + if (wholeCols.containsKey(topCol)) { + throw new RuntimeException( + "Conflicting updates for column '" + topCol + "' in SET clause."); + } + String subName = path.get(1); + LinkedHashMap subs = + nestedCols.computeIfAbsent(topCol, k -> new LinkedHashMap<>()); + if (subs.containsKey(subName)) { + throw new RuntimeException( + "Duplicated update for sub-field '" + + topCol + + "." + + subName + + "' in SET clause."); + } + subs.put(subName, entry.getValue()); + } + } + + // first pass: writePaths (so projectByPaths can build the pruned nested types) + writePaths = new ArrayList<>(); + for (String topCol : order) { + if (wholeCols.containsKey(topCol)) { + writePaths.add(topCol); + } else { + for (String subName : nestedCols.get(topCol).keySet()) { + writePaths.add(topCol + "." + subName); + } + } + } + + // second pass: build projection expressions + RowType pruned = table.rowType().projectByPaths(writePaths); + List project = new ArrayList<>(); + for (String topCol : order) { + if (wholeCols.containsKey(topCol)) { + project.add(String.format("%s AS `%s`", wholeCols.get(topCol), topCol)); + } else { + LinkedHashMap subs = nestedCols.get(topCol); + DataType prunedColType = + pruned.getField(table.rowType().getField(topCol).id()).type(); + // value order must match the pruned struct's schema field order + List values = new ArrayList<>(); + for (DataField subField : ((RowType) prunedColType).getFields()) { + String value = subs.get(subField.name()); + Preconditions.checkState( + value != null, + "Missing value for sub-field '%s.%s', it's a bug.", + topCol, + subField.name()); + values.add(value); + } + String typeStr = + LogicalTypeConversion.toLogicalType(prunedColType).asSerializableString(); + project.add( + String.format( + "CAST(ROW(%s) AS %s) AS `%s`", + String.join(", ", values), typeStr, topCol)); + } + } + return project; + } + + /** The first-seen order of top-level columns present in the (dotted) write paths. */ + private List explicitTopColumnOrder(List paths) { + List order = new ArrayList<>(); + for (String path : paths) { + int dot = path.indexOf('.'); + String topCol = dot < 0 ? path : path.substring(0, dot); + if (!order.contains(topCol)) { + order.add(topCol); + } + } + return order; + } + + /** + * Parse a SET target into a path relative to the target table: strip an optional leading + * table-qualifier segment (the target table name/alias), leaving {@code [topColumn, sub...]}. + */ + private List parseTargetPath(String target) { + List segs = new ArrayList<>(Arrays.asList(target.split("\\."))); + if (segs.size() > 1 && segs.get(0).equals(targetTableName())) { + segs.remove(0); + } + return segs; + } + public DataStream> shuffleByFirstRowId( DataStream source, RowType sourceType) { Transformation sourceTransformation = source.getTransformation(); @@ -389,7 +534,7 @@ public DataStream writePartialColumns( "PARTIAL WRITE COLUMNS", new CommittableTypeInfo(), new DataEvolutionPartialWriteOperator( - (FileStoreTable) table, rowType, baseSnapshotId)) + (FileStoreTable) table, rowType, writePaths, baseSnapshotId)) .setParallelism(sinkParallelism); } @@ -526,7 +671,23 @@ private void checkSchema(Table source) { .getTypeRoot() .getFamilies() .contains(DataTypeFamily.BINARY_STRING); + // Struct columns need a structural compatibility check: DataTypeCasts does not + // support ROW-to-ROW casts. For a sub-field write (dotted paths like nest.a) the + // source is a partial (subset) struct carrying only the updated sub-fields, so a + // subset check is correct. For a whole-column assignment (e.g. T.nest=S.nest) the + // source must fully cover the target struct, so a narrower source is rejected + // instead of being written as an incomplete whole-struct file. + boolean structCompatible = false; + if (paimonType instanceof RowType && targetField.type() instanceof RowType) { + RowType sourceStruct = (RowType) paimonType; + RowType targetStruct = (RowType) targetField.type(); + structCompatible = + isSubFieldWrite(flinkColumn.getName()) + ? isCompatiblePartialStruct(sourceStruct, targetStruct) + : isFullyCompatibleStruct(sourceStruct, targetStruct); + } if (!blobCompatible + && !structCompatible && !DataTypeCasts.supportsCompatibleCast(paimonType, targetField.type())) { throw new IllegalStateException( String.format( @@ -540,6 +701,61 @@ private void checkSchema(Table source) { } } + /** + * Whether {@code part} is a valid partial (subset) of the full struct {@code full}: every + * sub-field of {@code part} must exist in {@code full} (by name) with a compatible cast. + */ + private boolean isCompatiblePartialStruct(RowType part, RowType full) { + for (DataField partField : part.getFields()) { + if (!full.containsField(partField.name())) { + return false; + } + DataType fullSubType = full.getField(partField.name()).type(); + if (partField.type() instanceof RowType && fullSubType instanceof RowType) { + if (!isCompatiblePartialStruct((RowType) partField.type(), (RowType) fullSubType)) { + return false; + } + } else if (!DataTypeCasts.supportsCompatibleCast(partField.type(), fullSubType)) { + return false; + } + } + return true; + } + + /** + * Whether {@code source} fully covers the target struct {@code target} for a whole-column + * assignment: every target sub-field must exist in {@code source} (by name) with a compatible + * cast. A source missing a target sub-field (a narrower struct) is rejected. + */ + private boolean isFullyCompatibleStruct(RowType source, RowType target) { + for (DataField targetField : target.getFields()) { + if (!source.containsField(targetField.name())) { + return false; + } + DataType sourceSubType = source.getField(targetField.name()).type(); + DataType targetSubType = targetField.type(); + if (sourceSubType instanceof RowType && targetSubType instanceof RowType) { + if (!isFullyCompatibleStruct((RowType) sourceSubType, (RowType) targetSubType)) { + return false; + } + } else if (!DataTypeCasts.supportsCompatibleCast(sourceSubType, targetSubType)) { + return false; + } + } + return true; + } + + /** + * Whether the given top-level column is written through dotted sub-field paths (e.g. nest.a). + */ + private boolean isSubFieldWrite(String topColumn) { + if (writePaths == null) { + return false; + } + String prefix = topColumn + "."; + return writePaths.stream().anyMatch(p -> p.startsWith(prefix)); + } + private void handleSqls() { // NOTE: sql may change current catalog and database if (sourceSqls != null) { @@ -570,11 +786,6 @@ private String escapedSourceName() { .collect(Collectors.joining(".")); } - private String extractFieldName(String sourceField) { - String[] fieldPath = sourceField.split("\\."); - return fieldPath[fieldPath.length - 1]; - } - private String escapedRowTrackingTargetName() { return String.format( "`%s`.`%s`.`%s$row_tracking`", diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java index f57393b7ca27..5561c9581b7a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java @@ -54,7 +54,6 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; -import java.util.stream.Collectors; import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.types.VectorType.isVectorStoreFile; @@ -93,17 +92,19 @@ public class DataEvolutionPartialWriteOperator private transient Writer writer; public DataEvolutionPartialWriteOperator( - FileStoreTable table, RowType dataType, Long baseSnapshotId) { + FileStoreTable table, + RowType sourceType, + List writePaths, + Long baseSnapshotId) { this.table = table.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G")); this.baseSnapshotId = baseSnapshotId; - List fieldNames = - dataType.getFieldNames().stream() - .filter(name -> !SpecialFields.ROW_ID.name().equals(name)) - .collect(Collectors.toList()); - this.writeType = table.rowType().project(fieldNames); - this.dataType = - SpecialFields.rowTypeWithRowId(table.rowType()).project(dataType.getFieldNames()); + // writePaths may carry nested dotted paths (e.g. "nest.a") for sub-field-level data + // evolution; projectByPaths handles both plain top-level names and nested paths. + this.writeType = table.rowType().projectByPaths(writePaths); + // sourceType is already pruned to the written columns (with partial nested structs) and + // carries the table's field ids, so it is used directly as the read/data type. + this.dataType = sourceType; this.rowIdIndex = this.dataType.getFieldIndex(SpecialFields.ROW_ID.name()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java new file mode 100644 index 000000000000..86f7f36ccdda --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/NestedSubfieldMergeIntoActionITCase.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.table.FileStoreTable; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; +import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED; +import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_NESTED_FIELD_ENABLED; +import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.sEnv; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * ITCase for sub-field-level data evolution via {@link DataEvolutionMergeIntoAction}: updating a + * single sub-field of a nested struct column should write an incremental file containing only that + * sub-field (a dotted write column like {@code nest.a}) aligned by row id, while the rest of the + * struct is read back from the original file. + */ +public class NestedSubfieldMergeIntoActionITCase extends ActionITCaseBase { + + @BeforeEach + @Override + public void before() throws IOException { + super.before(); + init(warehouse); + } + + private void prepareNestedTarget(boolean nestedFieldEnabled) throws Exception { + sEnv.executeSql( + buildDdl( + "T", + Arrays.asList("id INT", "nest ROW"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + if (nestedFieldEnabled) { + put(DATA_EVOLUTION_NESTED_FIELD_ENABLED.key(), "true"); + } + } + })); + insertInto( + "T", + "(1, CAST(ROW(10, 'x') AS ROW))", + "(2, CAST(ROW(20, 'y') AS ROW))"); + } + + private void prepareSubFieldSource() throws Exception { + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("id INT", "newa INT"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + } + })); + insertInto("S", "(1, 100)"); + } + + @Test + public void testUpdateSingleSubFieldWritesOnlyThatLeaf() throws Exception { + prepareNestedTarget(true); + prepareSubFieldSource(); + + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest.a=S.newa") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run(); + + // correctness: nest.a updated for id=1, nest.b preserved, other row untouched + testBatchRead( + "SELECT id, nest.a, nest.b FROM T", + Arrays.asList(changelogRow("+I", 1, 100, "x"), changelogRow("+I", 2, 20, "y"))); + + // feature engaged: an incremental file written by the merge only contains nest.a + assertThat(deltaWriteCols("T")).contains(Collections.singletonList("nest.a")); + } + + @Test + public void testUpdateSubFieldDisabledThrows() throws Exception { + // data-evolution.nested-field.enabled left at its default (false) + prepareNestedTarget(false); + prepareSubFieldSource(); + + assertThatThrownBy( + () -> + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest.a=S.newa") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run()) + .hasMessageContaining(DATA_EVOLUTION_NESTED_FIELD_ENABLED.key()); + } + + @Test + public void testUpdateWholeStructStillWorks() throws Exception { + prepareNestedTarget(true); + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("id INT", "nest ROW"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + } + })); + insertInto("S", "(1, CAST(ROW(100, 'z') AS ROW))"); + + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest=S.nest") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run(); + + testBatchRead( + "SELECT id, nest.a, nest.b FROM T", + Arrays.asList(changelogRow("+I", 1, 100, "z"), changelogRow("+I", 2, 20, "y"))); + } + + @Test + public void testWholeStructAssignmentWithNarrowerSourceThrows() throws Exception { + // target nest is ROW; a whole-column assignment from a narrower source ROW must be + // rejected (it would otherwise be written as an incomplete whole-struct file) + prepareNestedTarget(true); + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("id INT", "nest ROW"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + } + })); + insertInto("S", "(1, CAST(ROW(100) AS ROW))"); + + assertThatThrownBy( + () -> + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest=S.nest") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run()) + .hasMessageContaining("incompatible"); + } + + @Test + public void testUpdateMultipleSubFieldsWritesOnlyThoseLeaves() throws Exception { + // a 3-field struct so that updating two sub-fields is a strict subset (stays + // sub-field-level + // rather than collapsing to a whole-column write) + sEnv.executeSql( + buildDdl( + "T", + Arrays.asList("id INT", "nest ROW"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + put(DATA_EVOLUTION_NESTED_FIELD_ENABLED.key(), "true"); + } + })); + insertInto( + "T", + "(1, CAST(ROW(10, 'x', 30) AS ROW))", + "(2, CAST(ROW(20, 'y', 40) AS ROW))"); + + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("id INT", "newa INT", "newc INT"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + } + })); + insertInto("S", "(1, 100, 300)"); + + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest.a=S.newa,T.nest.c=S.newc") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run(); + + // correctness: a and c updated for id=1, b preserved, other row untouched + testBatchRead( + "SELECT id, nest.a, nest.b, nest.c FROM T", + Arrays.asList( + changelogRow("+I", 1, 100, "x", 300), changelogRow("+I", 2, 20, "y", 40))); + + // feature engaged: the incremental file contains exactly the two touched leaves, in schema + // order (a before c) + assertThat(deltaWriteCols("T")).contains(Arrays.asList("nest.a", "nest.c")); + } + + @Test + public void testUpdateDeeplyNestedSubFieldThrows() throws Exception { + sEnv.executeSql( + buildDdl( + "T", + Arrays.asList("id INT", "nest ROW>"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + put(DATA_EVOLUTION_NESTED_FIELD_ENABLED.key(), "true"); + } + })); + insertInto("T", "(1, CAST(ROW(10, ROW(1, 2)) AS ROW>))"); + + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("id INT", "newx INT"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + } + })); + insertInto("S", "(1, 100)"); + + // deeper-than-one-level sub-field updates are rejected (the reader composes only one level) + assertThatThrownBy( + () -> + builder(warehouse, database, "T") + .withMergeCondition("T.id=S.id") + .withMatchedUpdateSet("T.nest.sub.x=S.newx") + .withSourceTable("S") + .withSinkParallelism(2) + .build() + .run()) + .hasMessageContaining("one level"); + } + + /** The write columns of every data file in the latest snapshot of {@code tableName}. */ + private List> deltaWriteCols(String tableName) throws Exception { + FileStoreTable table = getFileStoreTable(tableName); + List> result = new ArrayList<>(); + for (ManifestEntry entry : table.store().newScan().plan().files()) { + DataFileMeta file = entry.file(); + result.add(file.writeCols()); + } + return result; + } + + private DataEvolutionMergeIntoActionBuilder builder( + String warehouse, String database, String table) { + return new DataEvolutionMergeIntoActionBuilder(warehouse, database, table); + } + + private static class DataEvolutionMergeIntoActionBuilder { + private final List args; + + DataEvolutionMergeIntoActionBuilder(String warehouse, String database, String table) { + this.args = + new ArrayList<>( + Arrays.asList( + "data_evolution_merge_into", + "--warehouse", + warehouse, + "--database", + database, + "--table", + table)); + } + + DataEvolutionMergeIntoActionBuilder withSourceTable(String sourceTable) { + args.add("--source_table"); + args.add(sourceTable); + return this; + } + + DataEvolutionMergeIntoActionBuilder withMergeCondition(String mergeCondition) { + args.add("--on"); + args.add(mergeCondition); + return this; + } + + DataEvolutionMergeIntoActionBuilder withMatchedUpdateSet(String matchedUpdateSet) { + args.add("--matched_update_set"); + args.add(matchedUpdateSet); + return this; + } + + DataEvolutionMergeIntoActionBuilder withSinkParallelism(int sinkParallelism) { + args.add("--sink_parallelism"); + args.add(String.valueOf(sinkParallelism)); + return this; + } + + DataEvolutionMergeIntoAction build() { + return (DataEvolutionMergeIntoAction) + ActionFactory.createAction(args.toArray(new String[0])) + .orElseThrow(RuntimeException::new); + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index 1c93bdcb754d..84054b554db8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -26,6 +26,7 @@ import org.apache.paimon.table.sink._ import org.apache.paimon.table.source.DataSplit import org.apache.paimon.types.DataType import org.apache.paimon.types.DataTypeRoot.BLOB +import org.apache.paimon.types.RowType import org.apache.paimon.types.VectorType.isVectorStoreFile import org.apache.paimon.utils.SerializationUtils @@ -44,14 +45,27 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se override val table: FileStoreTable = paimonTable.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G")) + // Whole top-level column write (kept for callers that only update full columns). def writePartialFields( data: DataFrame, columnNames: Seq[String], rawBlobPlaceholderMarkerColumns: Map[String, String] = Map.empty): Seq[CommitMessage] = { + writePartialFields( + data, + table.rowType().projectByPaths(columnNames.asJava), + rawBlobPlaceholderMarkerColumns) + } + + // Sub-field-aware write: writeType is already pruned to the written top-level columns and + // (possibly) nested sub-fields via dotted paths. + def writePartialFields( + data: DataFrame, + writeType: RowType, + rawBlobPlaceholderMarkerColumns: Map[String, String]): Seq[CommitMessage] = { val sparkSession = data.sparkSession import sparkSession.implicits._ - assert(data.columns.length == columnNames.size + 2 + rawBlobPlaceholderMarkerColumns.size) - val writeType = table.rowType().project(columnNames.asJava) + assert( + data.columns.length == writeType.getFieldCount + 2 + rawBlobPlaceholderMarkerColumns.size) val options = new CoreOptions(table.schema().options()) val blobInlineFields = options.blobInlineField().asScala.toSeq diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 7464176c8763..faca50657c09 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -44,7 +44,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.PaimonUtils._ import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver -import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, EqualTo, Expression, ExprId, Literal, Or, PythonUDF, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, CreateNamedStruct, EqualTo, Expression, ExprId, GetStructField, Literal, Or, PythonUDF, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter} import org.apache.spark.sql.catalyst.plans.logical._ @@ -385,7 +385,63 @@ case class MergeIntoPaimonDataEvolutionTable( val rawBlobMarkerAttributes = rawBlobUpdateColumns.map( attr => AttributeReference(rawBlobMarkerNamesByColumn(attr.name), BooleanType, nullable = false)()) - val mergeOutput = updateColumnsSorted ++ metadataColumns ++ rawBlobMarkerAttributes + + // Sub-field-level pruning: for a struct column whose SET only touches some sub-fields, only the + // changed leaves are written (an incremental column-group file containing the partial struct); + // the rest are copied from the target. Falls back to whole-column write when the changed leaves + // cannot be safely determined, so behaviour never regresses. + val matchedUpdateActions = matchedActions.collect { case ua: UpdateAction => ua } + // Gated by data-evolution.nested-field.enabled (default off): when disabled, no column is + // pruned, so every struct column is rewritten whole (behaviour identical to before this + // feature). When enabled, struct columns whose SET only touches some sub-fields are pruned. + val nestedFieldEnabled = table.coreOptions().dataEvolutionNestedFieldEnabled() + val prunedByExprId: Map[ExprId, (Seq[Seq[String]], StructType)] = + if (!nestedFieldEnabled) Map.empty + else + updateColumnsSorted.flatMap { + attr => + if (rawBlobUpdateColumns.exists(_.sameRef(attr))) { + None + } else { + attr.dataType match { + case st: StructType => + val perAction = matchedUpdateActions.flatMap { + ua => + ua.assignments + .find( + a => isModifiedAssignment(a) && assignmentKeyAttribute(a).sameRef(attr)) + .map(a => changedLeaves(a.value, st, attr)) + } + if (perAction.isEmpty || perAction.exists(_.isEmpty)) { + None + } else { + val union = perAction.flatten.flatten.map(_._1).distinct + // The reader composes a split struct only one level deep, so only prune when + // every change addresses a direct sub-field of the top-level struct (depth 1). + // A deeper change (e.g. nest.inner.x) would split an inner struct across files, + // which the read path does not support, so fall back to a whole-column write. + // Prune only when the changed direct sub-fields are a strict subset of all of + // them (otherwise the whole column is rewritten anyway). + val allDepthOne = union.forall(_.size == 1) + if (union.isEmpty || !allDepthOne || union.size >= st.fields.length) { + None + } else { + Some(attr.exprId -> (union, prunedStructType(st, union))) + } + } + case _ => None + } + } + }.toMap + + val mergeOutput = updateColumnsSorted.map { + attr => + prunedByExprId.get(attr.exprId) match { + case Some((_, prunedType)) => + AttributeReference(attr.name, prunedType, attr.nullable)() + case None => attr + } + } ++ metadataColumns ++ rawBlobMarkerAttributes val realUpdateActions = matchedActions .map(s => s.asInstanceOf[UpdateAction]) @@ -438,7 +494,20 @@ case class MergeIntoPaimonDataEvolutionTable( ) { Literal(null, attr.dataType) } else { - assignmentValue(action, attr) + prunedByExprId.get(attr.exprId) match { + case Some((paths, _)) => + val st = attr.dataType.asInstanceOf[StructType] + val actionMap = + changedLeaves(assignmentValue(action, attr), st, attr) + .getOrElse(Seq.empty) + .toMap + buildPrunedStruct( + st, + Nil, + paths, + p => actionMap.getOrElse(p, passthroughExpr(attr, st, p))) + case None => assignmentValue(action, attr) + } } } val metadata = metadataColumns.map(attr => assignmentValue(action, attr)) @@ -460,7 +529,12 @@ case class MergeIntoPaimonDataEvolutionTable( if (rawBlobUpdateColumns.exists(_.sameRef(attr))) { Literal(null, attr.dataType) } else { - attr + prunedByExprId.get(attr.exprId) match { + case Some((paths, _)) => + val st = attr.dataType.asInstanceOf[StructType] + buildPrunedStruct(st, Nil, paths, p => passthroughExpr(attr, st, p)) + case None => attr + } } } copiedColumns ++ metadataColumns ++ rawBlobUpdateColumns.map(_ => TrueLiteral) @@ -611,10 +685,20 @@ case class MergeIntoPaimonDataEvolutionTable( .sortWithinPartitions(FIRST_ROW_ID_NAME, ROW_ID_NAME) } + // dotted write paths: a whole column -> its name; a pruned struct -> "col.subfield..." leaves + val writePaths = updateColumnsSorted.flatMap { + attr => + prunedByExprId.get(attr.exprId) match { + case Some((paths, _)) => paths.map(p => (attr.name +: p).mkString(".")) + case None => Seq(attr.name) + } + } + val writeType = table.rowType().projectByPaths(writePaths.asJava) + val writer = DataEvolutionPaimonWriter(table, dataSplits) writer.writePartialFields( toWrite, - updateColumnsSorted.map(_.name), + writeType, rawBlobUpdateColumns.map(attr => attr.name -> rawBlobMarkerNamesByColumn(attr.name)).toMap) } @@ -857,6 +941,135 @@ object MergeIntoPaimonDataEvolutionTable { col("`" + name.replace("`", "``") + "`") } + /** + * For an aligned UPDATE value of a struct column, return the changed leaf paths (relative to the + * column) paired with their new value expression, or `None` if the value is not a recognizable + * named-struct rebuild (in which case the whole column must be written). + */ + private[commands] def changedLeaves( + value: Expression, + structType: StructType, + base: Expression): Option[Seq[(Seq[String], Expression)]] = { + val out = mutable.LinkedHashMap.empty[Seq[String], Expression] + if (collectChanges(value, structType, base, Nil, out)) Some(out.toSeq) else None + } + + private def collectChanges( + value: Expression, + structType: StructType, + base: Expression, + prefix: Seq[String], + out: mutable.LinkedHashMap[Seq[String], Expression]): Boolean = { + value match { + case cns: CreateNamedStruct => + val pairs = cns.children.grouped(2).toSeq + if (pairs.exists(_.size != 2)) { + return false + } + var ok = true + pairs.foreach { + pair => + val nameExpr = pair.head + val v = pair(1) + nameExpr match { + case Literal(nameVal, _) if nameVal != null => + val name = nameVal.toString + val ordinalOpt = { + val i = structType.fieldNames.indexOf(name) + if (i >= 0) Some(i) else None + } + ordinalOpt match { + case Some(ordinal) => + val fieldType = structType(ordinal).dataType + val passthrough = GetStructField(base, ordinal, Some(name)) + if (!v.semanticEquals(passthrough)) { + (fieldType, v) match { + case (st: StructType, inner: CreateNamedStruct) => + ok = collectChanges( + inner, + st, + GetStructField(base, ordinal, Some(name)), + prefix :+ name, + out) && ok + case _ => + out.put(prefix :+ name, v) + } + } + case None => ok = false + } + case _ => ok = false + } + } + ok + case _ => false + } + } + + /** Build a Spark StructType containing only the given (possibly nested) leaf paths. */ + private[commands] def prunedStructType( + structType: StructType, + paths: Seq[Seq[String]]): StructType = { + val byHead = paths.filter(_.nonEmpty).groupBy(_.head) + val fields = structType.fields.filter(f => byHead.contains(f.name)).map { + f => + val sub = byHead(f.name) + if (sub.exists(_.size == 1)) { + f + } else { + f.copy(dataType = prunedStructType(f.dataType.asInstanceOf[StructType], sub.map(_.tail))) + } + } + StructType(fields) + } + + /** Build a named_struct over the given leaf paths; each terminal leaf value comes from valueFn. */ + private[commands] def buildPrunedStruct( + structType: StructType, + prefix: Seq[String], + paths: Seq[Seq[String]], + valueFn: Seq[String] => Expression): CreateNamedStruct = { + val byHead = paths.filter(_.nonEmpty).groupBy(_.head) + val args = structType.fields.flatMap { + f => + byHead.get(f.name) match { + case None => Seq.empty[Expression] + case Some(sub) => + val fieldPath = prefix :+ f.name + val expr = + if (sub.exists(_.size == 1)) { + valueFn(fieldPath) + } else { + buildPrunedStruct( + f.dataType.asInstanceOf[StructType], + fieldPath, + sub.map(_.tail), + valueFn) + } + Seq(Literal(f.name): Expression, expr) + } + } + CreateNamedStruct(args.toSeq) + } + + /** Read a (possibly nested) leaf from a base expression via GetStructField chain. */ + private[commands] def passthroughExpr( + base: Expression, + structType: StructType, + path: Seq[String]): Expression = { + if (path.isEmpty) { + base + } else { + val head = path.head + val ordinal = structType.fieldIndex(head) + val child = GetStructField(base, ordinal, Some(head)) + if (path.tail.isEmpty) { + child + } else { + passthroughExpr(child, structType(ordinal).dataType.asInstanceOf[StructType], path.tail) + } + } + } + private def sameAttributeReference(left: Expression, right: Expression): Boolean = { (left, right) match { case (leftAttr: AttributeReference, rightAttr: AttributeReference) => diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala new file mode 100644 index 000000000000..d1cbec5d1315 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/NestedSubfieldMergeIntoTest.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.table.source.DataSplit + +import org.apache.spark.sql.Row + +import scala.collection.JavaConverters._ + +/** + * End-to-end tests for sub-field-level data evolution via Spark `MERGE INTO`: updating a single + * sub-field of a nested struct column should write an incremental file containing only that + * sub-field (a dotted write column like `nest.a`), aligned by row-id, while the rest of the struct + * is read back from the original file. + */ +class NestedSubfieldMergeIntoTest extends PaimonSparkTestBase { + + import testImplicits._ + + private def latestDeltaWriteCols(tableName: String): Seq[Seq[String]] = { + val t = loadTable(tableName) + val splits = t.newSnapshotReader().read().splits().asScala + splits + .flatMap(_.asInstanceOf[DataSplit].dataFiles().asScala) + .map(f => Option(f.writeCols()).map(_.asScala.toSeq).getOrElse(Seq.empty)) + .toSeq + } + + test("Sub-field data evolution: MERGE INTO updating one struct sub-field writes only that leaf") { + withTable("s", "t") { + sql(s""" + |CREATE TABLE t (id INT, nest STRUCT) TBLPROPERTIES ( + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true', + | 'data-evolution.nested-field.enabled' = 'true') + |""".stripMargin) + sql( + "INSERT INTO t VALUES (1, named_struct('a', 10, 'b', 'x')), " + + "(2, named_struct('a', 20, 'b', 'y'))") + + Seq((1, 100)).toDF("id", "newa").createOrReplaceTempView("s") + + sql(s""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.nest.a = s.newa + |""".stripMargin).collect() + + // correctness: nest.a updated for id=1, nest.b preserved, other row untouched + checkAnswer( + sql("SELECT id, nest.a, nest.b FROM t ORDER BY id"), + Seq(Row(1, 100, "x"), Row(2, 20, "y"))) + + // feature engaged: the incremental file written by the merge only contains nest.a + val deltaCols = latestDeltaWriteCols("t") + assert( + deltaCols.exists(cols => cols == Seq("nest.a")), + s"expected an incremental file with writeCols == [nest.a], got: $deltaCols") + } + } + + test("Sub-field data evolution: updating whole struct still writes the whole column") { + withTable("s", "t") { + sql(s""" + |CREATE TABLE t (id INT, nest STRUCT) TBLPROPERTIES ( + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true', + | 'data-evolution.nested-field.enabled' = 'true') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, named_struct('a', 10, 'b', 'x'))") + + Seq((1, 100, "z")).toDF("id", "newa", "newb").createOrReplaceTempView("s") + + sql(s""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.nest = named_struct('a', s.newa, 'b', s.newb) + |""".stripMargin).collect() + + checkAnswer(sql("SELECT id, nest.a, nest.b FROM t"), Seq(Row(1, 100, "z"))) + } + } + + test("Sub-field data evolution: disabled by default, sub-field update rewrites the whole column") { + withTable("s", "t") { + // data-evolution.nested-field.enabled is left at its default (false) + sql(s""" + |CREATE TABLE t (id INT, nest STRUCT) TBLPROPERTIES ( + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, named_struct('a', 10, 'b', 'x'))") + + Seq((1, 100)).toDF("id", "newa").createOrReplaceTempView("s") + + sql(s""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.nest.a = s.newa + |""".stripMargin).collect() + + // correctness still holds: nest.a updated, nest.b preserved + checkAnswer(sql("SELECT id, nest.a, nest.b FROM t"), Seq(Row(1, 100, "x"))) + + // but no sub-field incremental file is produced: the whole nest column is rewritten + val deltaCols = latestDeltaWriteCols("t") + assert( + !deltaCols.exists(cols => cols.contains("nest.a")), + s"expected no dotted (sub-field) writeCols when feature is disabled, got: $deltaCols") + } + } +}