diff --git a/docs/docs/primary-key-table/merge-engine/aggregation.mdx b/docs/docs/primary-key-table/merge-engine/aggregation.mdx index 96e6e4f3f076..c8ed77f3f42d 100644 --- a/docs/docs/primary-key-table/merge-engine/aggregation.mdx +++ b/docs/docs/primary-key-table/merge-engine/aggregation.mdx @@ -311,6 +311,14 @@ public static class BitmapContainsUDF extends ScalarFunction { Use `fields..nested-key=pk0,pk1,...` to specify the primary keys of the nested table. If no keys, row will be appended to array\. + Use `fields..nested-key-null-strategy=` to specify how rows are handled when the nested-key does not satisfy primary key semantics (e.g., primary key fields contain null values). + + This option is optional. If it is not configured, the behavior is equivalent to using `merge`. + + - `merge`: Merge rows even if the nested-key does not satisfy primary key semantics. This is the same behavior as when this option is not configured. + - `ignore`: Ignore rows whose nested-key contains null values because they do not satisfy primary key semantics. + - `error`: Throw an exception if the nested-key contains null values, because primary key fields must not be null. + Use `fields..nested-sequence-field=seq0,seq1,...` to control the update sequence of a nested table, you must configure `fields..nested-key` when using it. Use `fields..count-limit=` to specify the maximum number of rows in the nested table. When no nested-key, it will select data diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e3b6b01952cc..f0619a1e2ddd 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -80,6 +80,8 @@ public class CoreOptions implements Serializable { public static final String NESTED_KEY = "nested-key"; + public static final String NESTED_KEY_NULL_STRATEGY = "nested-key-null-strategy"; + public static final String NESTED_SEQUENCE_FIELD = "nested-sequence-field"; public static final String COUNT_LIMIT = "count-limit"; @@ -2943,6 +2945,13 @@ public List fieldNestedUpdateAggNestedKey(String fieldName) { return Arrays.stream(keyString.split(",")).map(String::trim).collect(Collectors.toList()); } + public NestedKeyNullStrategy fieldNestedUpdateAggNestedKeyNullStrategy(String fieldName) { + return options.get( + key(FIELDS_PREFIX + "." + fieldName + "." + NESTED_KEY_NULL_STRATEGY) + .enumType(NestedKeyNullStrategy.class) + .noDefaultValue()); + } + public List fieldNestedUpdateAggNestedSequenceField(String fieldName) { String keyString = options.get( @@ -5046,4 +5055,37 @@ public InlineElement getDescription() { return text(description); } } + + /** Strategy for handling rows whose nested-key contains null values. */ + public enum NestedKeyNullStrategy implements DescribedEnum { + MERGE( + "merge", + "Merge rows even if the nested-key contains null values, without enforcing primary key semantics."), + + IGNORE( + "ignore", + "Ignore rows whose nested-key contains null values because they do not satisfy primary key semantics."), + + ERROR( + "error", + "Throw an exception if the nested-key contains null values, because primary key fields must not be null."); + + private final String value; + private final String description; + + NestedKeyNullStrategy(String value, String description) { + this.value = value; + this.description = description; + } + + @Override + public InlineElement getDescription() { + return text(description); + } + + @Override + public String toString() { + return value; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index 03c4c78500a0..42b6de482f43 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -18,6 +18,7 @@ package org.apache.paimon.mergetree.compact.aggregate; +import org.apache.paimon.CoreOptions; import org.apache.paimon.codegen.Projection; import org.apache.paimon.codegen.RecordComparator; import org.apache.paimon.codegen.RecordEqualiser; @@ -32,7 +33,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,17 +62,14 @@ public class FieldNestedUpdateAgg extends FieldAggregator { @Nullable private final RecordComparator sequenceComparator; private final boolean hasSequenceField; + private final CoreOptions.NestedKeyNullStrategy nestedKeyNullStrategy; private final int countLimit; - public FieldNestedUpdateAgg( - String name, ArrayType dataType, List nestedKey, int countLimit) { - this(name, dataType, nestedKey, Collections.emptyList(), countLimit); - } - public FieldNestedUpdateAgg( String name, ArrayType dataType, List nestedKey, + CoreOptions.NestedKeyNullStrategy nestedKeyNullStrategy, List nestedSequenceField, int countLimit) { super(name, dataType); @@ -85,12 +83,24 @@ public FieldNestedUpdateAgg( this.elementEqualiser = null; } + checkArgument( + nestedKeyNullStrategy == null || this.keyProjection != null, + "Option 'fields..nested-key-null-strategy' requires " + + "'fields..nested-key' to be configured."); + + // Default to MERGE to preserve the previous behavior. + this.nestedKeyNullStrategy = + nestedKeyNullStrategy == null + ? CoreOptions.NestedKeyNullStrategy.MERGE + : nestedKeyNullStrategy; + // If nestedSequenceField is set, we need to compare sequence fields to determine // whether to update. Only update when the new sequence is greater than the old one. if (!nestedSequenceField.isEmpty()) { checkArgument( this.keyProjection != null, - "nested-sequence-field requires nested-key to be set."); + "Option 'fields..nested-sequence-field' requires " + + "'fields..nested-key' to be configured."); this.sequenceProjection = newProjection(nestedType, nestedSequenceField); this.hasSequenceField = true; @@ -179,14 +189,22 @@ public Object retract(Object accumulator, Object retractField) { continue; } InternalRow row = acc.getRow(i, nestedFields); - map.put(keyProjection.apply(row).copy(), row); + BinaryRow key = keyProjection.apply(row).copy(); + if (!applyNestedKeyNullStrategy(key)) { + continue; + } + map.put(key, row); } for (int i = 0; i < retract.size(); i++) { if (retract.isNullAt(i)) { continue; } - map.remove(keyProjection.apply(retract.getRow(i, nestedFields))); + BinaryRow key = keyProjection.apply(retract.getRow(i, nestedFields)).copy(); + if (!applyNestedKeyNullStrategy(key)) { + continue; + } + map.remove(key); } return new GenericArray(new ArrayList<>(map.values()).toArray()); @@ -243,6 +261,11 @@ private void addNestedRows( InternalRow row = array.getRow(i, nestedFields); BinaryRow key = keyProjection.apply(row).copy(); + + if (!applyNestedKeyNullStrategy(key)) { + continue; + } + InternalRow existing = rows.get(key); if (existing != null) { if (!hasSequenceField || compareSequence(row, existing) >= 0) { @@ -253,4 +276,27 @@ private void addNestedRows( } } } + + private boolean applyNestedKeyNullStrategy(BinaryRow key) { + if (!key.anyNull()) { + // The nested-key satisfies primary key semantics. + return true; + } + switch (nestedKeyNullStrategy) { + case MERGE: + // Preserve the previous behavior. + return true; + case IGNORE: + return false; + case ERROR: + throw new IllegalArgumentException( + "Nested key contains null values. Primary key fields must not be null."); + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported nested-key-null-strategy '%s'. Supported values are: %s.", + nestedKeyNullStrategy, + Arrays.toString(CoreOptions.NestedKeyNullStrategy.values()))); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java index ca0d9614986a..39bd5502e8df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java @@ -39,6 +39,7 @@ public FieldNestedUpdateAgg create(DataType fieldType, CoreOptions options, Stri return createFieldNestedUpdateAgg( fieldType, options.fieldNestedUpdateAggNestedKey(field), + options.fieldNestedUpdateAggNestedKeyNullStrategy(field), options.fieldNestedUpdateAggNestedSequenceField(field), options.fieldNestedUpdateAggCountLimit(field)); } @@ -51,6 +52,7 @@ public String identifier() { private FieldNestedUpdateAgg createFieldNestedUpdateAgg( DataType fieldType, List nestedKey, + CoreOptions.NestedKeyNullStrategy nestedKeyNullStrategy, List nestedSequenceField, int countLimit) { if (nestedKey == null) { @@ -68,6 +70,11 @@ private FieldNestedUpdateAgg createFieldNestedUpdateAgg( checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType); return new FieldNestedUpdateAgg( - identifier(), arrayType, nestedKey, nestedSequenceField, countLimit); + identifier(), + arrayType, + nestedKey, + nestedKeyNullStrategy, + nestedSequenceField, + countLimit); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index c791fd149cdb..aeb5f5a13044 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -734,6 +734,8 @@ public void testFieldNestedUpdateAgg() { DataTypes.FIELD(1, "k1", DataTypes.INT()), DataTypes.FIELD(2, "v", DataTypes.STRING()))), Arrays.asList("k0", "k1"), + null, + Collections.emptyList(), Integer.MAX_VALUE); InternalArray accumulator; @@ -773,6 +775,8 @@ public void testFieldNestedAppendAgg() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Collections.emptyList(), + null, + Collections.emptyList(), Integer.MAX_VALUE); InternalArray accumulator = null; @@ -807,6 +811,8 @@ public void testFieldNestedAppendAggWithCountLimit() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Collections.emptyList(), + null, + Collections.emptyList(), 2); InternalArray accumulator = null; @@ -842,6 +848,8 @@ public void testFieldNestedAppendAggWithCountLimitOnFirstInputArray() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Collections.emptyList(), + null, + Collections.emptyList(), 2); InternalArray.ElementGetter elementGetter = @@ -867,6 +875,8 @@ public void testFieldNestedUpdateAggWithCountLimitUpdatesExistingKeyAtLimitWitho FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, + Collections.emptyList(), 2); InternalArray accumulator = null; @@ -900,6 +910,8 @@ public void testFieldNestedUpdateAggWithCountLimitOnFirstInputArrayWithoutSequen FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, + Collections.emptyList(), 2); InternalArray.ElementGetter elementGetter = @@ -932,6 +944,7 @@ public void testFieldNestedUpdateAggWithSequenceField() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, Collections.singletonList("seq"), Integer.MAX_VALUE); @@ -983,6 +996,7 @@ public void testFieldNestedUpdateAggWithMultipleSequenceFields() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, Arrays.asList("seq", "ts"), Integer.MAX_VALUE); @@ -1089,10 +1103,13 @@ public void testFieldNestedUpdateAggWithSequenceFieldWithoutNestedKey() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Collections.emptyList(), + null, Collections.singletonList("seq"), Integer.MAX_VALUE)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("nested-sequence-field requires nested-key to be set."); + .hasMessage( + "Option 'fields..nested-sequence-field' requires " + + "'fields..nested-key' to be configured."); } @Test @@ -1111,10 +1128,13 @@ public void testFieldNestedUpdateAggWithCountLimitWithSequenceFieldWithoutNested FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Collections.emptyList(), + null, Collections.singletonList("seq"), 2)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("nested-sequence-field requires nested-key to be set."); + .hasMessage( + "Option 'fields..nested-sequence-field' requires " + + "'fields..nested-key' to be configured."); } @Test @@ -1131,6 +1151,7 @@ public void testFieldNestedUpdateAggWithCountLimitWithSequenceField() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, Collections.singletonList("seq"), 2); // Enforce count limit = 2 @@ -1179,6 +1200,7 @@ public void testFieldNestedUpdateAggWithCountLimitUpdatesExistingKeyAtLimit() { FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, Collections.singletonList("seq"), 2); @@ -1215,6 +1237,7 @@ public void testFieldNestedUpdateAggWithCountLimitOnFirstInputArrayWithSequence( FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), Arrays.asList("k0", "k1"), + null, Collections.singletonList("seq"), 2); @@ -1235,6 +1258,611 @@ public void testFieldNestedUpdateAggWithCountLimitOnFirstInputArrayWithSequence( Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); } + @Test + public void testFieldNestedUpdateAggWithNestedKeyNullStrategyArgumentCheck() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + // ============================================================ + // 1. nested-key is not specified, but nested-key-null-strategy is provided + // This should be rejected because strategy depends on nested-key. + // ============================================================ + org.assertj.core.api.Assertions.assertThatThrownBy( + () -> + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.emptyList(), + CoreOptions.NestedKeyNullStrategy.MERGE, + Collections.emptyList(), + 2)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Option 'fields..nested-key-null-strategy' requires " + + "'fields..nested-key' to be configured."); + + // ============================================================ + // 2. neither nested-key nor nested-key-null-strategy is specified + // This is valid and should fall back to default behavior (MERGE). + // ============================================================ + FieldNestedUpdateAgg agg1 = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.emptyList(), + null, + Collections.emptyList(), + 2); + + org.assertj.core.api.Assertions.assertThat(agg1).isNotNull(); + + // ============================================================ + // 3. nested-key is specified but nested-key-null-strategy is not + // This is valid and should use default strategy (MERGE). + // ============================================================ + FieldNestedUpdateAgg agg2 = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.singletonList("k0"), + null, + Collections.emptyList(), + 2); + + org.assertj.core.api.Assertions.assertThat(agg2).isNotNull(); + + // ============================================================ + // 4. both nested-key and nested-key-null-strategy are specified + // This should be accepted and use the provided strategy. + // ============================================================ + FieldNestedUpdateAgg agg3 = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.singletonList("k0"), + CoreOptions.NestedKeyNullStrategy.MERGE, + Collections.emptyList(), + 2); + + org.assertj.core.api.Assertions.assertThat(agg3).isNotNull(); + } + + @Test + public void testFieldNestedUpdateAggWithoutNestedKeyNullStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + null, // Preserve the previous behavior. + Collections.emptyList(), + Integer.MAX_VALUE); + + InternalArray accumulator; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + InternalRow current = row(0, 0, "A", 1); + accumulator = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + current = row(0, 1, "B", 2); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + current = row(0, null, "C", 3); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2), row(0, null, "C", 3))); + + current = row(null, null, "D", 4); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 0, "A", 1), + row(0, 1, "B", 2), + row(0, null, "C", 3), + row(null, null, "D", 4))); + } + + @Test + public void testFieldNestedUpdateAggWhenNestedKeyNullUseMergeStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy + Collections.emptyList(), + Integer.MAX_VALUE); + + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + // ============================================================ + // when the accumulator is empty + // ============================================================ + InternalArray accumulatorEmpty; + InternalRow current = row(0, null, "C", 3); + + // case 1: partially null nested key (some PK fields are null) + accumulatorEmpty = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulatorEmpty, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + // case 2: fully null nested key (all PK fields are null) + current = row(null, null, "D", 4); + accumulatorEmpty = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulatorEmpty, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + // ============================================================ + // when the accumulator is non-empty + // ============================================================ + InternalArray accumulator; + + current = row(0, 0, "A", 1); + accumulator = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + current = row(0, 1, "B", 2); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + // case 1: partially null nested key (some PK fields are null) + current = row(0, null, "C", 3); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2), row(0, null, "C", 3))); + + // case 2: fully null nested key (all PK fields are null) + current = row(null, null, "D", 4); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 0, "A", 1), + row(0, 1, "B", 2), + row(0, null, "C", 3), + row(null, null, "D", 4))); + } + + @Test + public void testFieldNestedUpdateAggWhenNestedKeyNullUseIgnoreStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy + Collections.emptyList(), + Integer.MAX_VALUE); + + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + // ============================================================ + // when the accumulator is empty + // ============================================================ + InternalArray accumulatorEmpty; + + // case 1: partially null nested key (some PK fields are null) + accumulatorEmpty = (InternalArray) agg.agg(null, singletonArray(row(0, null, "C", 3))); + assertThat(unnest(accumulatorEmpty, elementGetter)).isEmpty(); + + // case 2: fully null nested key (all PK fields are null) + accumulatorEmpty = (InternalArray) agg.agg(null, singletonArray(row(null, null, "D", 4))); + assertThat(unnest(accumulatorEmpty, elementGetter)).isEmpty(); + + // ============================================================ + // when the accumulator is non-empty + // ============================================================ + InternalArray accumulator; + + InternalRow current = row(0, 0, "A", 1); + accumulator = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + current = row(0, 1, "B", 2); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + // case 1: partially null nested key (some PK fields are null) + current = row(0, null, "C", 3); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + // case 2: fully null nested key (all PK fields are null) + current = row(null, null, "D", 4); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + } + + @Test + public void testFieldNestedUpdateAggWhenNestedKeyNullUseThrowErrorStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.ERROR, // use error strategy + Collections.emptyList(), + Integer.MAX_VALUE); + + // ============================================================ + // when the accumulator is empty + // ============================================================ + + // case 1: partially null nested key (some PK fields are null) + assertThatThrownBy(() -> agg.agg(null, singletonArray(row(0, null, "C", 3)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); + + // case 2: fully null nested key (all PK fields are null) + assertThatThrownBy(() -> agg.agg(null, singletonArray(row(null, null, "D", 4)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); + + // ============================================================ + // when the accumulator is non-empty + // ============================================================ + InternalArray accumulator; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + InternalRow current = row(0, 0, "A", 1); + accumulator = (InternalArray) agg.agg(null, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(current)); + + current = row(0, 1, "B", 2); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + // case 1: partially null nested key (some PK fields are null) + InternalArray finalAccumulator1 = accumulator; + assertThatThrownBy(() -> agg.agg(finalAccumulator1, singletonArray(row(0, null, "C", 3)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); + + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + + // case 2: fully null nested key (all PK fields are null) + InternalArray finalAccumulator2 = accumulator; + assertThatThrownBy( + () -> agg.agg(finalAccumulator2, singletonArray(row(null, null, "D", 4)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); + + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 0, "A", 1), row(0, 1, "B", 2))); + } + + @Test + public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseMergeStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.MERGE, // use merge strategy + Collections.singletonList("seq"), + 3); + + InternalArray accumulator = null; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); + accumulator = + (InternalArray) + agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 5))); + + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 1, "B_updated", 4), + row(null, 2, "NULL_2", 2), + row(null, null, "NULL_NULL", 3))); + } + + @Test + public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseIgnoreStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.IGNORE, // use ignore strategy + Collections.singletonList("seq"), + 3); + + InternalArray accumulator = null; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(null, 2, "NULL_2", 2))); + accumulator = + (InternalArray) + agg.agg(accumulator, singletonArray(row(null, null, "NULL_NULL", 3))); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 3))); + + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(2, 3, "D", 5))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 1, "B_updated", 4), row(1, 2, "C", 3), row(2, 3, "D", 5))); + } + + @Test + public void testFieldNestedUpdateAggWithCountLimitWhenNestedKeyNullUseThrowErrorStrategy() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.ERROR, // use error strategy + Collections.singletonList("seq"), + 3); + + InternalArray accumulator = null; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); + + InternalArray finalAccumulator = accumulator; + assertThatThrownBy( + () -> agg.agg(finalAccumulator, singletonArray(row(null, 2, "NULL_2", 2)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); + + assertThatThrownBy( + () -> + agg.agg( + finalAccumulator, + singletonArray(row(null, null, "NULL_NULL", 3)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 3))); + + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(2, 3, "D", 5))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + row(0, 1, "B_updated", 4), row(1, 2, "C", 3), row(2, 3, "D", 5))); + } + + @Test + public void testFieldNestedUpdateAggRetractAppliesNestedKeyNullStrategyToAccumulator() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING())); + + // Build an accumulator containing a null nested key. + FieldNestedUpdateAgg mergeAgg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.MERGE, + Collections.emptyList(), + Integer.MAX_VALUE); + + InternalArray accumulator = null; + accumulator = (InternalArray) mergeAgg.agg(accumulator, singletonArray(row(0, null, "A"))); + accumulator = (InternalArray) mergeAgg.agg(accumulator, singletonArray(row(1, 0, "B"))); + accumulator = (InternalArray) mergeAgg.agg(accumulator, singletonArray(row(1, 1, "C"))); + + // ============================================================ + // Verify IGNORE behavior: + // rows with null nested keys in the accumulator should be ignored. + // ============================================================ + FieldNestedUpdateAgg ignoreAgg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.IGNORE, + Collections.emptyList(), + Integer.MAX_VALUE); + + InternalArray result = + (InternalArray) ignoreAgg.retract(accumulator, singletonArray(row(1, 0, "B"))); + + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + assertThat(unnest(result, elementGetter)).containsExactly(row(1, 1, "C")); + + // ============================================================ + // Verify ERROR behavior: + // rows with null nested keys in the accumulator should throw exception. + // ============================================================ + FieldNestedUpdateAgg errorAgg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.ERROR, + Collections.emptyList(), + Integer.MAX_VALUE); + + final InternalArray acc = accumulator; + + assertThatThrownBy(() -> errorAgg.retract(acc, singletonArray(row(1, 0, "B")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); + } + + @Test + public void testFieldNestedUpdateAggRetractAppliesNestedKeyNullStrategyToRetractInput() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING())); + + // Build an accumulator without null nested keys. + FieldNestedUpdateAgg mergeAgg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.MERGE, + Collections.emptyList(), + Integer.MAX_VALUE); + + InternalArray accumulator = null; + accumulator = (InternalArray) mergeAgg.agg(accumulator, singletonArray(row(0, 0, "A"))); + accumulator = (InternalArray) mergeAgg.agg(accumulator, singletonArray(row(1, 1, "B"))); + + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + // ============================================================ + // Verify IGNORE behavior: + // rows with null nested keys in the retract input should be ignored. + // ============================================================ + FieldNestedUpdateAgg ignoreAgg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.IGNORE, + Collections.emptyList(), + Integer.MAX_VALUE); + + InternalArray result = + (InternalArray) ignoreAgg.retract(accumulator, singletonArray(row(0, null, "X"))); + + assertThat(unnest(result, elementGetter)) + .containsExactlyInAnyOrder(row(0, 0, "A"), row(1, 1, "B")); + + // ============================================================ + // Verify ERROR behavior: + // rows with null nested keys in the retract input should throw exception. + // ============================================================ + FieldNestedUpdateAgg errorAgg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + CoreOptions.NestedKeyNullStrategy.ERROR, + Collections.emptyList(), + Integer.MAX_VALUE); + + final InternalArray acc = accumulator; + + assertThatThrownBy(() -> errorAgg.retract(acc, singletonArray(row(0, null, "X")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Nested key contains null values. Primary key fields must not be null."); + } + private List unnest(InternalArray array, InternalArray.ElementGetter elementGetter) { return IntStream.range(0, array.size()) .mapToObj(i -> elementGetter.getElementOrNull(array, i))