From 02dbe12d8aa891230cba1aa687b8721b07f8b43a Mon Sep 17 00:00:00 2001 From: AuroraVoyage Date: Sun, 28 Jun 2026 15:05:10 +0800 Subject: [PATCH 1/3] [core] Introduce nested-key-null-strategy for handling non-PK-compliant nested keys in FieldNestedUpdateAgg --- .../merge-engine/aggregation.mdx | 8 + .../java/org/apache/paimon/CoreOptions.java | 45 ++ .../aggregate/FieldNestedUpdateAgg.java | 51 +- .../factory/FieldNestedUpdateAggFactory.java | 4 +- .../aggregate/FieldAggregatorTest.java | 526 +++++++++++++++++- 5 files changed, 625 insertions(+), 9 deletions(-) diff --git a/docs/docs/primary-key-table/merge-engine/aggregation.mdx b/docs/docs/primary-key-table/merge-engine/aggregation.mdx index 96e6e4f3f076..c8ed77f3f42d 100644 --- a/docs/docs/primary-key-table/merge-engine/aggregation.mdx +++ b/docs/docs/primary-key-table/merge-engine/aggregation.mdx @@ -311,6 +311,14 @@ public static class BitmapContainsUDF extends ScalarFunction { Use `fields..nested-key=pk0,pk1,...` to specify the primary keys of the nested table. If no keys, row will be appended to array\. + Use `fields..nested-key-null-strategy=` to specify how rows are handled when the nested-key does not satisfy primary key semantics (e.g., primary key fields contain null values). + + This option is optional. If it is not configured, the behavior is equivalent to using `merge`. + + - `merge`: Merge rows even if the nested-key does not satisfy primary key semantics. This is the same behavior as when this option is not configured. + - `ignore`: Ignore rows whose nested-key contains null values because they do not satisfy primary key semantics. + - `error`: Throw an exception if the nested-key contains null values, because primary key fields must not be null. + Use `fields..nested-sequence-field=seq0,seq1,...` to control the update sequence of a nested table, you must configure `fields..nested-key` when using it. Use `fields..count-limit=` to specify the maximum number of rows in the nested table. When no nested-key, it will select data diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e3b6b01952cc..e6fd6137bad9 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -80,6 +80,8 @@ public class CoreOptions implements Serializable { public static final String NESTED_KEY = "nested-key"; + public static final String NESTED_KEY_NULL_STRATEGY = "nested-key-null-strategy"; + public static final String NESTED_SEQUENCE_FIELD = "nested-sequence-field"; public static final String COUNT_LIMIT = "count-limit"; @@ -2943,6 +2945,13 @@ public List fieldNestedUpdateAggNestedKey(String fieldName) { return Arrays.stream(keyString.split(",")).map(String::trim).collect(Collectors.toList()); } + public NestedKeyNullStrategy fieldNestedUpdateAggNestedKeyNullStrategy(String fieldName) { + return options.get( + key(FIELDS_PREFIX + "." + fieldName + "." + NESTED_KEY_NULL_STRATEGY) + .enumType(NestedKeyNullStrategy.class) + .noDefaultValue()); + } + public List fieldNestedUpdateAggNestedSequenceField(String fieldName) { String keyString = options.get( @@ -5046,4 +5055,40 @@ public InlineElement getDescription() { return text(description); } } + + /** + * Strategy for handling rows whose nested-key contains null values. + */ + public enum NestedKeyNullStrategy implements DescribedEnum { + + MERGE( + "merge", + "Merge rows even if the nested-key contains null values, without enforcing primary key semantics."), + + IGNORE( + "ignore", + "Ignore rows whose nested-key contains null values because they do not satisfy primary key semantics."), + + ERROR( + "error", + "Throw an exception if the nested-key contains null values, because primary key fields must not be null."); + + private final String value; + private final String description; + + NestedKeyNullStrategy(String value, String description) { + this.value = value; + this.description = description; + } + + @Override + public InlineElement getDescription() { + return text(description); + } + + @Override + public String toString() { + return value; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index 03c4c78500a0..9e72e8322f28 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -18,6 +18,7 @@ package org.apache.paimon.mergetree.compact.aggregate; +import org.apache.paimon.CoreOptions; import org.apache.paimon.codegen.Projection; import org.apache.paimon.codegen.RecordComparator; import org.apache.paimon.codegen.RecordEqualiser; @@ -32,6 +33,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -61,17 +63,14 @@ public class FieldNestedUpdateAgg extends FieldAggregator { @Nullable private final RecordComparator sequenceComparator; private final boolean hasSequenceField; + private final CoreOptions.NestedKeyNullStrategy nestedKeyNullStrategy; private final int countLimit; - public FieldNestedUpdateAgg( - String name, ArrayType dataType, List nestedKey, int countLimit) { - this(name, dataType, nestedKey, Collections.emptyList(), countLimit); - } - public FieldNestedUpdateAgg( String name, ArrayType dataType, List nestedKey, + CoreOptions.NestedKeyNullStrategy nestedKeyNullStrategy, List nestedSequenceField, int countLimit) { super(name, dataType); @@ -85,12 +84,24 @@ public FieldNestedUpdateAgg( this.elementEqualiser = null; } + checkArgument( + nestedKeyNullStrategy == null || this.keyProjection != null, + "Option 'fields..nested-key-null-strategy' requires " + + "'fields..nested-key' to be configured."); + + // Default to MERGE to preserve the previous behavior. + this.nestedKeyNullStrategy = + nestedKeyNullStrategy == null + ? CoreOptions.NestedKeyNullStrategy.MERGE + : nestedKeyNullStrategy; + // If nestedSequenceField is set, we need to compare sequence fields to determine // whether to update. Only update when the new sequence is greater than the old one. if (!nestedSequenceField.isEmpty()) { checkArgument( this.keyProjection != null, - "nested-sequence-field requires nested-key to be set."); + "Option 'fields..nested-sequence-field' requires " + + "'fields..nested-key' to be configured."); this.sequenceProjection = newProjection(nestedType, nestedSequenceField); this.hasSequenceField = true; @@ -243,6 +254,11 @@ private void addNestedRows( InternalRow row = array.getRow(i, nestedFields); BinaryRow key = keyProjection.apply(row).copy(); + + if (!applyNestedKeyNullStrategy(key)) { + continue; + } + InternalRow existing = rows.get(key); if (existing != null) { if (!hasSequenceField || compareSequence(row, existing) >= 0) { @@ -253,4 +269,27 @@ private void addNestedRows( } } } + + private boolean applyNestedKeyNullStrategy(BinaryRow key) { + if (!key.anyNull()) { + // The nested-key satisfies primary key semantics. + return true; + } + switch (nestedKeyNullStrategy) { + case MERGE: + // Preserve the previous behavior. + return true; + case IGNORE: + return false; + case ERROR: + throw new IllegalArgumentException( + "Nested key contains null values. Primary key fields must not be null."); + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported nested-key-null-strategy '%s'. Supported values are: %s.", + nestedKeyNullStrategy, + Arrays.toString(CoreOptions.NestedKeyNullStrategy.values()))); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java index ca0d9614986a..9c8746f0f6e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java @@ -39,6 +39,7 @@ public FieldNestedUpdateAgg create(DataType fieldType, CoreOptions options, Stri return createFieldNestedUpdateAgg( fieldType, options.fieldNestedUpdateAggNestedKey(field), + options.fieldNestedUpdateAggNestedKeyNullStrategy(field), options.fieldNestedUpdateAggNestedSequenceField(field), options.fieldNestedUpdateAggCountLimit(field)); } @@ -51,6 +52,7 @@ public String identifier() { private FieldNestedUpdateAgg createFieldNestedUpdateAgg( DataType fieldType, List nestedKey, + CoreOptions.NestedKeyNullStrategy nestedKeyNullStrategy, List nestedSequenceField, int countLimit) { if (nestedKey == null) { @@ -68,6 +70,6 @@ private FieldNestedUpdateAgg createFieldNestedUpdateAgg( checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType); return new FieldNestedUpdateAgg( - identifier(), arrayType, nestedKey, nestedSequenceField, countLimit); + identifier(), arrayType, nestedKey, nestedKeyNullStrategy, nestedSequenceField, countLimit); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index c791fd149cdb..80601bf3d229 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -734,6 +734,8 @@ public void testFieldNestedUpdateAgg() { DataTypes.FIELD(1, "k1", DataTypes.INT()), DataTypes.FIELD(2, "v", DataTypes.STRING()))), Arrays.asList("k0", "k1"), + null, + Collections.emptyList(), Integer.MAX_VALUE); InternalArray accumulator; @@ -773,6 +775,8 @@ public void testFieldNestedAppendAgg() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Collections.emptyList(), + null, + Collections.emptyList(), Integer.MAX_VALUE); InternalArray accumulator = null; @@ -807,6 +811,8 @@ public void testFieldNestedAppendAggWithCountLimit() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Collections.emptyList(), + null, + Collections.emptyList(), 2); InternalArray accumulator = null; @@ -842,6 +848,8 @@ public void testFieldNestedAppendAggWithCountLimitOnFirstInputArray() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Collections.emptyList(), + null, + Collections.emptyList(), 2); InternalArray.ElementGetter elementGetter = @@ -867,6 +875,8 @@ public void testFieldNestedUpdateAggWithCountLimitUpdatesExistingKeyAtLimitWitho FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, + Collections.emptyList(), 2); InternalArray accumulator = null; @@ -900,6 +910,8 @@ public void testFieldNestedUpdateAggWithCountLimitOnFirstInputArrayWithoutSequen FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, + Collections.emptyList(), 2); InternalArray.ElementGetter elementGetter = @@ -932,6 +944,7 @@ public void testFieldNestedUpdateAggWithSequenceField() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, Collections.singletonList("seq"), Integer.MAX_VALUE); @@ -983,6 +996,7 @@ public void testFieldNestedUpdateAggWithMultipleSequenceFields() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, Arrays.asList("seq", "ts"), Integer.MAX_VALUE); @@ -1089,10 +1103,12 @@ public void testFieldNestedUpdateAggWithSequenceFieldWithoutNestedKey() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Collections.emptyList(), + null, Collections.singletonList("seq"), Integer.MAX_VALUE)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("nested-sequence-field requires nested-key to be set."); + .hasMessage("Option 'fields..nested-sequence-field' requires " + + "'fields..nested-key' to be configured."); } @Test @@ -1111,10 +1127,13 @@ public void testFieldNestedUpdateAggWithCountLimitWithSequenceFieldWithoutNested FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Collections.emptyList(), + null, Collections.singletonList("seq"), 2)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("nested-sequence-field requires nested-key to be set."); + .hasMessage("Option 'fields..nested-sequence-field' requires " + + "'fields..nested-key' to be configured." + ); } @Test @@ -1131,6 +1150,7 @@ public void testFieldNestedUpdateAggWithCountLimitWithSequenceField() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, Collections.singletonList("seq"), 2); // Enforce count limit = 2 @@ -1179,6 +1199,7 @@ public void testFieldNestedUpdateAggWithCountLimitUpdatesExistingKeyAtLimit() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, Collections.singletonList("seq"), 2); @@ -1215,6 +1236,7 @@ public void testFieldNestedUpdateAggWithCountLimitOnFirstInputArrayWithSequence( FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, Collections.singletonList("seq"), 2); @@ -1235,6 +1257,506 @@ public void testFieldNestedUpdateAggWithCountLimitOnFirstInputArrayWithSequence( Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); } + @Test + public void testFieldNestedUpdateAggWithNestedKeyNullStrategyArgumentCheck() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + // ============================================================ + // 1. nested-key is not specified, but nested-key-null-strategy is provided + // This should be rejected because strategy depends on nested-key. + // ============================================================ + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.emptyList(), + CoreOptions.NestedKeyNullStrategy.MERGE, + Collections.emptyList(), + 2)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Option 'fields..nested-key-null-strategy' requires " + + "'fields..nested-key' to be configured."); + + // ============================================================ + // 2. neither nested-key nor nested-key-null-strategy is specified + // This is valid and should fall back to default behavior (MERGE). + // ============================================================ + FieldNestedUpdateAgg agg1 = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.emptyList(), + null, + Collections.emptyList(), + 2); + + org.assertj.core.api.Assertions.assertThat(agg1).isNotNull(); + + // ============================================================ + // 3. nested-key is specified but nested-key-null-strategy is not + // This is valid and should use default strategy (MERGE). + // ============================================================ + FieldNestedUpdateAgg agg2 = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.singletonList("k0"), + null, + Collections.emptyList(), + 2); + + org.assertj.core.api.Assertions.assertThat(agg2).isNotNull(); + + // ============================================================ + // 4. both nested-key and nested-key-null-strategy are specified + // This should be accepted and use the provided strategy. + // ============================================================ + FieldNestedUpdateAgg agg3 = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.singletonList("k0"), + CoreOptions.NestedKeyNullStrategy.MERGE, + Collections.emptyList(), + 2); + + org.assertj.core.api.Assertions.assertThat(agg3).isNotNull(); + } + + @Test + public void testFieldNestedUpdateAggWithoutNestedKeyNullStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + null, // Preserve the previous behavior. + Collections.emptyList(), + Integer.MAX_VALUE); + + + InternalArray accumulator; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + InternalRow current = row(0, 0, "A", 1); + accumulator = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + current = row(0, 1, "B", 2); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + current = row(0, null, "C", 3); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 0, "A", 1), + row(0, 1, "B", 2), + row(0, null, "C", 3) + )); + + current = row(null, null, "D", 4); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 0, "A", 1), + row(0, 1, "B", 2), + row(0, null, "C", 3), + row(null, null, "D", 4) + )); + } + + @Test + public void testFieldNestedUpdateAggWhenNestedKeyNullUseMergeStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy + Collections.emptyList(), + Integer.MAX_VALUE); + + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + // ============================================================ + // when the accumulator is empty + // ============================================================ + InternalArray accumulatorEmpty; + InternalRow current = row(0, null, "C", 3); + + // case 1: partially null nested key (some PK fields are null) + accumulatorEmpty = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulatorEmpty, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + // case 2: fully null nested key (all PK fields are null) + current = row(null, null, "D", 4); + accumulatorEmpty = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulatorEmpty, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + + // ============================================================ + // when the accumulator is non-empty + // ============================================================ + InternalArray accumulator; + + current = row(0, 0, "A", 1); + accumulator = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + current = row(0, 1, "B", 2); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + // case 1: partially null nested key (some PK fields are null) + current = row(0, null, "C", 3); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 0, "A", 1), + row(0, 1, "B", 2), + row(0, null, "C", 3) + )); + + // case 2: fully null nested key (all PK fields are null) + current = row(null, null, "D", 4); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 0, "A", 1), + row(0, 1, "B", 2), + row(0, null, "C", 3), + row(null, null, "D", 4) + )); + } + + @Test + public void testFieldNestedUpdateAggWhenNestedKeyNullUseIgnoreStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy + Collections.emptyList(), + Integer.MAX_VALUE); + + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + + // ============================================================ + // when the accumulator is empty + // ============================================================ + InternalArray accumulatorEmpty; + + // case 1: partially null nested key (some PK fields are null) + accumulatorEmpty = (InternalArray) agg.agg(null, singletonArray(row(0, null, "C", 3))); + assertThat(unnest(accumulatorEmpty, elementGetter)).isEmpty(); + + // case 2: fully null nested key (all PK fields are null) + accumulatorEmpty = (InternalArray) agg.agg(null, singletonArray(row(null, null, "D", 4))); + assertThat(unnest(accumulatorEmpty, elementGetter)).isEmpty(); + + + // ============================================================ + // when the accumulator is non-empty + // ============================================================ + InternalArray accumulator; + + InternalRow current = row(0, 0, "A", 1); + accumulator = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + current = row(0, 1, "B", 2); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + // case 1: partially null nested key (some PK fields are null) + current = row(0, null, "C", 3); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 0, "A", 1), + row(0, 1, "B", 2) + )); + + // case 2: fully null nested key (all PK fields are null) + current = row(null, null, "D", 4); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 0, "A", 1), + row(0, 1, "B", 2) + )); + } + + @Test + public void testFieldNestedUpdateAggWhenNestedKeyNullUseThrowErrorStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.ERROR, // use error strategy + Collections.emptyList(), + Integer.MAX_VALUE); + + + // ============================================================ + // when the accumulator is empty + // ============================================================ + + // case 1: partially null nested key (some PK fields are null) + assertThatThrownBy(() -> agg.agg( + null, + singletonArray(row(0, null, "C", 3)) + )) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Nested key contains null values. Primary key fields must not be null."); + + // case 2: fully null nested key (all PK fields are null) + assertThatThrownBy(() -> agg.agg( + null, + singletonArray(row(null, null, "D", 4)) + )) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Nested key contains null values. Primary key fields must not be null."); + + + // ============================================================ + // when the accumulator is non-empty + // ============================================================ + InternalArray accumulator; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + InternalRow current = row(0, 0, "A", 1); + accumulator = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + current = row(0, 1, "B", 2); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + // case 1: partially null nested key (some PK fields are null) + InternalArray finalAccumulator1 = accumulator; + assertThatThrownBy(() -> agg.agg( + finalAccumulator1, + singletonArray(row(0, null, "C", 3)) + )) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Nested key contains null values. Primary key fields must not be null."); + + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + // case 2: fully null nested key (all PK fields are null) + InternalArray finalAccumulator2 = accumulator; + assertThatThrownBy(() -> agg.agg( + finalAccumulator2, singletonArray(row(null, null, "D", 4)) + )) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Nested key contains null values. Primary key fields must not be null."); + + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + } + + @Test + public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseMergeStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy + Collections.singletonList("seq"), + 3); + + InternalArray accumulator = null; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 5))); + + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 1, "B_updated", 4), + row(null, 2, "NULL_2", 2), + row(null, null, "NULL_NULL", 3) + )); + } + + @Test + public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseIgnoreStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy + Collections.singletonList("seq"), + 3); + + InternalArray accumulator = null; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 3))); + + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(2, 3, "D", 5))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 1, "B_updated", 4), + row(1, 2, "C", 3), + row(2, 3, "D", 5) + )); + } + + @Test + public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseThrowErrorStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.ERROR, // use error strategy + Collections.singletonList("seq"), + 3); + + InternalArray accumulator = null; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); + + InternalArray finalAccumulator = accumulator; + assertThatThrownBy(() -> + agg.agg(finalAccumulator, singletonArray(row(null, 2, "NULL_2", 2))) + ) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Nested key contains null values. Primary key fields must not be null."); + + assertThatThrownBy(() -> + agg.agg(finalAccumulator, singletonArray(row(null, null, "NULL_NULL", 3))) + ) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Nested key contains null values. Primary key fields must not be null."); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 3))); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(2, 3, "D", 5))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 1, "B_updated", 4), + row(1, 2, "C", 3), + row(2, 3, "D", 5) + )); + } + private List unnest(InternalArray array, InternalArray.ElementGetter elementGetter) { return IntStream.range(0, array.size()) .mapToObj(i -> elementGetter.getElementOrNull(array, i)) From 001cd046ece230940e8c133cf02e9f423ee08934 Mon Sep 17 00:00:00 2001 From: AuroraVoyage Date: Sun, 28 Jun 2026 15:10:49 +0800 Subject: [PATCH 2/3] [core] Introduce nested-key-null-strategy for handling non-PK-compliant nested keys in FieldNestedUpdateAgg --- .../java/org/apache/paimon/CoreOptions.java | 5 +- .../aggregate/FieldNestedUpdateAgg.java | 1 - .../factory/FieldNestedUpdateAggFactory.java | 7 +- .../aggregate/FieldAggregatorTest.java | 140 ++++++++---------- 4 files changed, 65 insertions(+), 88 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e6fd6137bad9..f0619a1e2ddd 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -5056,11 +5056,8 @@ public InlineElement getDescription() { } } - /** - * Strategy for handling rows whose nested-key contains null values. - */ + /** Strategy for handling rows whose nested-key contains null values. */ public enum NestedKeyNullStrategy implements DescribedEnum { - MERGE( "merge", "Merge rows even if the nested-key contains null values, without enforcing primary key semantics."), diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index 9e72e8322f28..a4d0880d6f46 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -34,7 +34,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java index 9c8746f0f6e8..39bd5502e8df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java @@ -70,6 +70,11 @@ private FieldNestedUpdateAgg createFieldNestedUpdateAgg( checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType); return new FieldNestedUpdateAgg( - identifier(), arrayType, nestedKey, nestedKeyNullStrategy, nestedSequenceField, countLimit); + identifier(), + arrayType, + nestedKey, + nestedKeyNullStrategy, + nestedSequenceField, + countLimit); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 80601bf3d229..b46bfd9bcfef 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -1107,8 +1107,9 @@ public void testFieldNestedUpdateAggWithSequenceFieldWithoutNestedKey() { Collections.singletonList("seq"), Integer.MAX_VALUE)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Option 'fields..nested-sequence-field' requires " - + "'fields..nested-key' to be configured."); + .hasMessage( + "Option 'fields..nested-sequence-field' requires " + + "'fields..nested-key' to be configured."); } @Test @@ -1131,9 +1132,9 @@ public void testFieldNestedUpdateAggWithCountLimitWithSequenceFieldWithoutNested Collections.singletonList("seq"), 2)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Option 'fields..nested-sequence-field' requires " + - "'fields..nested-key' to be configured." - ); + .hasMessage( + "Option 'fields..nested-sequence-field' requires " + + "'fields..nested-key' to be configured."); } @Test @@ -1344,11 +1345,10 @@ public void testFieldNestedUpdateAggWithoutNestedKeyNullStrategy() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - null, // Preserve the previous behavior. + null, // Preserve the previous behavior. Collections.emptyList(), Integer.MAX_VALUE); - InternalArray accumulator; InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(elementRowType); @@ -1368,11 +1368,7 @@ public void testFieldNestedUpdateAggWithoutNestedKeyNullStrategy() { accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( - Arrays.asList( - row(0, 0, "A", 1), - row(0, 1, "B", 2), - row(0, null, "C", 3) - )); + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2), row(0, null, "C", 3))); current = row(null, null, "D", 4); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); @@ -1382,8 +1378,7 @@ public void testFieldNestedUpdateAggWithoutNestedKeyNullStrategy() { row(0, 0, "A", 1), row(0, 1, "B", 2), row(0, null, "C", 3), - row(null, null, "D", 4) - )); + row(null, null, "D", 4))); } @Test @@ -1400,7 +1395,7 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseMergeStrategy() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy + CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy Collections.emptyList(), Integer.MAX_VALUE); @@ -1424,7 +1419,6 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseMergeStrategy() { assertThat(unnest(accumulatorEmpty, elementGetter)) .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); - // ============================================================ // when the accumulator is non-empty // ============================================================ @@ -1446,11 +1440,7 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseMergeStrategy() { accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( - Arrays.asList( - row(0, 0, "A", 1), - row(0, 1, "B", 2), - row(0, null, "C", 3) - )); + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2), row(0, null, "C", 3))); // case 2: fully null nested key (all PK fields are null) current = row(null, null, "D", 4); @@ -1461,8 +1451,7 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseMergeStrategy() { row(0, 0, "A", 1), row(0, 1, "B", 2), row(0, null, "C", 3), - row(null, null, "D", 4) - )); + row(null, null, "D", 4))); } @Test @@ -1479,14 +1468,13 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseIgnoreStrategy() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy + CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy Collections.emptyList(), Integer.MAX_VALUE); InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(elementRowType); - // ============================================================ // when the accumulator is empty // ============================================================ @@ -1500,7 +1488,6 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseIgnoreStrategy() { accumulatorEmpty = (InternalArray) agg.agg(null, singletonArray(row(null, null, "D", 4))); assertThat(unnest(accumulatorEmpty, elementGetter)).isEmpty(); - // ============================================================ // when the accumulator is non-empty // ============================================================ @@ -1522,20 +1509,14 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseIgnoreStrategy() { accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( - Arrays.asList( - row(0, 0, "A", 1), - row(0, 1, "B", 2) - )); + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); // case 2: fully null nested key (all PK fields are null) current = row(null, null, "D", 4); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( - Arrays.asList( - row(0, 0, "A", 1), - row(0, 1, "B", 2) - )); + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); } @Test @@ -1552,31 +1533,25 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseThrowErrorStrategy() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - CoreOptions.NestedKeyNullStrategy.ERROR, // use error strategy + CoreOptions.NestedKeyNullStrategy.ERROR, // use error strategy Collections.emptyList(), Integer.MAX_VALUE); - // ============================================================ // when the accumulator is empty // ============================================================ // case 1: partially null nested key (some PK fields are null) - assertThatThrownBy(() -> agg.agg( - null, - singletonArray(row(0, null, "C", 3)) - )) + assertThatThrownBy(() -> agg.agg(null, singletonArray(row(0, null, "C", 3)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); // case 2: fully null nested key (all PK fields are null) - assertThatThrownBy(() -> agg.agg( - null, - singletonArray(row(null, null, "D", 4)) - )) + assertThatThrownBy(() -> agg.agg(null, singletonArray(row(null, null, "D", 4)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); - + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); // ============================================================ // when the accumulator is non-empty @@ -1598,12 +1573,10 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseThrowErrorStrategy() { // case 1: partially null nested key (some PK fields are null) InternalArray finalAccumulator1 = accumulator; - assertThatThrownBy(() -> agg.agg( - finalAccumulator1, - singletonArray(row(0, null, "C", 3)) - )) + assertThatThrownBy(() -> agg.agg(finalAccumulator1, singletonArray(row(0, null, "C", 3)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( @@ -1611,11 +1584,11 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseThrowErrorStrategy() { // case 2: fully null nested key (all PK fields are null) InternalArray finalAccumulator2 = accumulator; - assertThatThrownBy(() -> agg.agg( - finalAccumulator2, singletonArray(row(null, null, "D", 4)) - )) + assertThatThrownBy( + () -> agg.agg(finalAccumulator2, singletonArray(row(null, null, "D", 4)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( @@ -1636,7 +1609,7 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseMergeStrat FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy + CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy Collections.singletonList("seq"), 3); @@ -1645,8 +1618,11 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseMergeStrat InternalArray.createElementGetter(elementRowType); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); - accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); - accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); + accumulator = + (InternalArray) + agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 5))); accumulator = @@ -1656,8 +1632,7 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseMergeStrat Arrays.asList( row(0, 1, "B_updated", 4), row(null, 2, "NULL_2", 2), - row(null, null, "NULL_NULL", 3) - )); + row(null, null, "NULL_NULL", 3))); } @Test @@ -1669,12 +1644,12 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseIgnoreStra DataTypes.FIELD(2, "v", DataTypes.STRING()), DataTypes.FIELD(3, "seq", DataTypes.INT())); - FieldNestedUpdateAgg agg = + FieldNestedUpdateAgg agg = new FieldNestedUpdateAgg( FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy + CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy Collections.singletonList("seq"), 3); @@ -1683,8 +1658,11 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseIgnoreStra InternalArray.createElementGetter(elementRowType); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); - accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); - accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); + accumulator = + (InternalArray) + agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 3))); accumulator = @@ -1697,10 +1675,7 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseIgnoreStra assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( Arrays.asList( - row(0, 1, "B_updated", 4), - row(1, 2, "C", 3), - row(2, 3, "D", 5) - )); + row(0, 1, "B_updated", 4), row(1, 2, "C", 3), row(2, 3, "D", 5))); } @Test @@ -1728,21 +1703,25 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseThrowError accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); InternalArray finalAccumulator = accumulator; - assertThatThrownBy(() -> - agg.agg(finalAccumulator, singletonArray(row(null, 2, "NULL_2", 2))) - ) + assertThatThrownBy( + () -> agg.agg(finalAccumulator, singletonArray(row(null, 2, "NULL_2", 2)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); - assertThatThrownBy(() -> - agg.agg(finalAccumulator, singletonArray(row(null, null, "NULL_NULL", 3))) - ) + assertThatThrownBy( + () -> + agg.agg( + finalAccumulator, + singletonArray(row(null, null, "NULL_NULL", 3)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 3))); - accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); @@ -1751,10 +1730,7 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseThrowError assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( Arrays.asList( - row(0, 1, "B_updated", 4), - row(1, 2, "C", 3), - row(2, 3, "D", 5) - )); + row(0, 1, "B_updated", 4), row(1, 2, "C", 3), row(2, 3, "D", 5))); } private List unnest(InternalArray array, InternalArray.ElementGetter elementGetter) { From c310e1525562ac6181af8197c194ed75d281592d Mon Sep 17 00:00:00 2001 From: AuroraVoyage Date: Sun, 28 Jun 2026 15:10:49 +0800 Subject: [PATCH 3/3] [core] Introduce nested-key-null-strategy for handling non-PK-compliant nested keys in FieldNestedUpdateAgg --- .../java/org/apache/paimon/CoreOptions.java | 5 +- .../aggregate/FieldNestedUpdateAgg.java | 1 - .../factory/FieldNestedUpdateAggFactory.java | 7 +- .../aggregate/FieldAggregatorTest.java | 140 ++++++++---------- 4 files changed, 65 insertions(+), 88 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e6fd6137bad9..f0619a1e2ddd 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -5056,11 +5056,8 @@ public InlineElement getDescription() { } } - /** - * Strategy for handling rows whose nested-key contains null values. - */ + /** Strategy for handling rows whose nested-key contains null values. */ public enum NestedKeyNullStrategy implements DescribedEnum { - MERGE( "merge", "Merge rows even if the nested-key contains null values, without enforcing primary key semantics."), diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index 9e72e8322f28..a4d0880d6f46 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -34,7 +34,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java index 9c8746f0f6e8..39bd5502e8df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java @@ -70,6 +70,11 @@ private FieldNestedUpdateAgg createFieldNestedUpdateAgg( checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType); return new FieldNestedUpdateAgg( - identifier(), arrayType, nestedKey, nestedKeyNullStrategy, nestedSequenceField, countLimit); + identifier(), + arrayType, + nestedKey, + nestedKeyNullStrategy, + nestedSequenceField, + countLimit); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 80601bf3d229..b46bfd9bcfef 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -1107,8 +1107,9 @@ public void testFieldNestedUpdateAggWithSequenceFieldWithoutNestedKey() { Collections.singletonList("seq"), Integer.MAX_VALUE)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Option 'fields..nested-sequence-field' requires " - + "'fields..nested-key' to be configured."); + .hasMessage( + "Option 'fields..nested-sequence-field' requires " + + "'fields..nested-key' to be configured."); } @Test @@ -1131,9 +1132,9 @@ public void testFieldNestedUpdateAggWithCountLimitWithSequenceFieldWithoutNested Collections.singletonList("seq"), 2)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Option 'fields..nested-sequence-field' requires " + - "'fields..nested-key' to be configured." - ); + .hasMessage( + "Option 'fields..nested-sequence-field' requires " + + "'fields..nested-key' to be configured."); } @Test @@ -1344,11 +1345,10 @@ public void testFieldNestedUpdateAggWithoutNestedKeyNullStrategy() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - null, // Preserve the previous behavior. + null, // Preserve the previous behavior. Collections.emptyList(), Integer.MAX_VALUE); - InternalArray accumulator; InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(elementRowType); @@ -1368,11 +1368,7 @@ public void testFieldNestedUpdateAggWithoutNestedKeyNullStrategy() { accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( - Arrays.asList( - row(0, 0, "A", 1), - row(0, 1, "B", 2), - row(0, null, "C", 3) - )); + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2), row(0, null, "C", 3))); current = row(null, null, "D", 4); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); @@ -1382,8 +1378,7 @@ public void testFieldNestedUpdateAggWithoutNestedKeyNullStrategy() { row(0, 0, "A", 1), row(0, 1, "B", 2), row(0, null, "C", 3), - row(null, null, "D", 4) - )); + row(null, null, "D", 4))); } @Test @@ -1400,7 +1395,7 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseMergeStrategy() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy + CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy Collections.emptyList(), Integer.MAX_VALUE); @@ -1424,7 +1419,6 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseMergeStrategy() { assertThat(unnest(accumulatorEmpty, elementGetter)) .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); - // ============================================================ // when the accumulator is non-empty // ============================================================ @@ -1446,11 +1440,7 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseMergeStrategy() { accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( - Arrays.asList( - row(0, 0, "A", 1), - row(0, 1, "B", 2), - row(0, null, "C", 3) - )); + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2), row(0, null, "C", 3))); // case 2: fully null nested key (all PK fields are null) current = row(null, null, "D", 4); @@ -1461,8 +1451,7 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseMergeStrategy() { row(0, 0, "A", 1), row(0, 1, "B", 2), row(0, null, "C", 3), - row(null, null, "D", 4) - )); + row(null, null, "D", 4))); } @Test @@ -1479,14 +1468,13 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseIgnoreStrategy() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy + CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy Collections.emptyList(), Integer.MAX_VALUE); InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(elementRowType); - // ============================================================ // when the accumulator is empty // ============================================================ @@ -1500,7 +1488,6 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseIgnoreStrategy() { accumulatorEmpty = (InternalArray) agg.agg(null, singletonArray(row(null, null, "D", 4))); assertThat(unnest(accumulatorEmpty, elementGetter)).isEmpty(); - // ============================================================ // when the accumulator is non-empty // ============================================================ @@ -1522,20 +1509,14 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseIgnoreStrategy() { accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( - Arrays.asList( - row(0, 0, "A", 1), - row(0, 1, "B", 2) - )); + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); // case 2: fully null nested key (all PK fields are null) current = row(null, null, "D", 4); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( - Arrays.asList( - row(0, 0, "A", 1), - row(0, 1, "B", 2) - )); + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); } @Test @@ -1552,31 +1533,25 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseThrowErrorStrategy() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - CoreOptions.NestedKeyNullStrategy.ERROR, // use error strategy + CoreOptions.NestedKeyNullStrategy.ERROR, // use error strategy Collections.emptyList(), Integer.MAX_VALUE); - // ============================================================ // when the accumulator is empty // ============================================================ // case 1: partially null nested key (some PK fields are null) - assertThatThrownBy(() -> agg.agg( - null, - singletonArray(row(0, null, "C", 3)) - )) + assertThatThrownBy(() -> agg.agg(null, singletonArray(row(0, null, "C", 3)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); // case 2: fully null nested key (all PK fields are null) - assertThatThrownBy(() -> agg.agg( - null, - singletonArray(row(null, null, "D", 4)) - )) + assertThatThrownBy(() -> agg.agg(null, singletonArray(row(null, null, "D", 4)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); - + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); // ============================================================ // when the accumulator is non-empty @@ -1598,12 +1573,10 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseThrowErrorStrategy() { // case 1: partially null nested key (some PK fields are null) InternalArray finalAccumulator1 = accumulator; - assertThatThrownBy(() -> agg.agg( - finalAccumulator1, - singletonArray(row(0, null, "C", 3)) - )) + assertThatThrownBy(() -> agg.agg(finalAccumulator1, singletonArray(row(0, null, "C", 3)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( @@ -1611,11 +1584,11 @@ public void testFieldNestedUpdateAggWhenNestedKeyNullUseThrowErrorStrategy() { // case 2: fully null nested key (all PK fields are null) InternalArray finalAccumulator2 = accumulator; - assertThatThrownBy(() -> agg.agg( - finalAccumulator2, singletonArray(row(null, null, "D", 4)) - )) + assertThatThrownBy( + () -> agg.agg(finalAccumulator2, singletonArray(row(null, null, "D", 4)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( @@ -1636,7 +1609,7 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseMergeStrat FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy + CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy Collections.singletonList("seq"), 3); @@ -1645,8 +1618,11 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseMergeStrat InternalArray.createElementGetter(elementRowType); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); - accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); - accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); + accumulator = + (InternalArray) + agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 5))); accumulator = @@ -1656,8 +1632,7 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseMergeStrat Arrays.asList( row(0, 1, "B_updated", 4), row(null, 2, "NULL_2", 2), - row(null, null, "NULL_NULL", 3) - )); + row(null, null, "NULL_NULL", 3))); } @Test @@ -1669,12 +1644,12 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseIgnoreStra DataTypes.FIELD(2, "v", DataTypes.STRING()), DataTypes.FIELD(3, "seq", DataTypes.INT())); - FieldNestedUpdateAgg agg = + FieldNestedUpdateAgg agg = new FieldNestedUpdateAgg( FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), - CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy + CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy Collections.singletonList("seq"), 3); @@ -1683,8 +1658,11 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseIgnoreStra InternalArray.createElementGetter(elementRowType); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); - accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); - accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); + accumulator = + (InternalArray) + agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 3))); accumulator = @@ -1697,10 +1675,7 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseIgnoreStra assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( Arrays.asList( - row(0, 1, "B_updated", 4), - row(1, 2, "C", 3), - row(2, 3, "D", 5) - )); + row(0, 1, "B_updated", 4), row(1, 2, "C", 3), row(2, 3, "D", 5))); } @Test @@ -1728,21 +1703,25 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseThrowError accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); InternalArray finalAccumulator = accumulator; - assertThatThrownBy(() -> - agg.agg(finalAccumulator, singletonArray(row(null, 2, "NULL_2", 2))) - ) + assertThatThrownBy( + () -> agg.agg(finalAccumulator, singletonArray(row(null, 2, "NULL_2", 2)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); - assertThatThrownBy(() -> - agg.agg(finalAccumulator, singletonArray(row(null, null, "NULL_NULL", 3))) - ) + assertThatThrownBy( + () -> + agg.agg( + finalAccumulator, + singletonArray(row(null, null, "NULL_NULL", 3)))) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Nested key contains null values. Primary key fields must not be null."); + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 3))); - accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); @@ -1751,10 +1730,7 @@ public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseThrowError assertThat(unnest(accumulator, elementGetter)) .containsExactlyInAnyOrderElementsOf( Arrays.asList( - row(0, 1, "B_updated", 4), - row(1, 2, "C", 3), - row(2, 3, "D", 5) - )); + row(0, 1, "B_updated", 4), row(1, 2, "C", 3), row(2, 3, "D", 5))); } private List unnest(InternalArray array, InternalArray.ElementGetter elementGetter) {