Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/docs/primary-key-table/merge-engine/aggregation.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,14 @@ public static class BitmapContainsUDF extends ScalarFunction {

Use `fields.<field-name>.nested-key=pk0,pk1,...` to specify the primary keys of the nested table. If no keys, row will be appended to array\<row>.

Use `fields.<field-name>.nested-key-null-strategy=<merge|ignore|error>` to specify how rows are handled when the nested-key does not satisfy primary key semantics (e.g., primary key fields contain null values).

This option is optional. If it is not configured, the behavior is equivalent to using `merge`.

- `merge`: Merge rows even if the nested-key does not satisfy primary key semantics. This is the same behavior as when this option is not configured.
- `ignore`: Ignore rows whose nested-key contains null values because they do not satisfy primary key semantics.
- `error`: Throw an exception if the nested-key contains null values, because primary key fields must not be null.

Use `fields.<field-name>.nested-sequence-field=seq0,seq1,...` to control the update sequence of a nested table, you must configure `fields.<field-name>.nested-key` when using it.

Use `fields.<field-name>.count-limit=<Integer>` to specify the maximum number of rows in the nested table. When no nested-key, it will select data
Expand Down
42 changes: 42 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class CoreOptions implements Serializable {

public static final String NESTED_KEY = "nested-key";

public static final String NESTED_KEY_NULL_STRATEGY = "nested-key-null-strategy";

public static final String NESTED_SEQUENCE_FIELD = "nested-sequence-field";

public static final String COUNT_LIMIT = "count-limit";
Expand Down Expand Up @@ -2943,6 +2945,13 @@ public List<String> fieldNestedUpdateAggNestedKey(String fieldName) {
return Arrays.stream(keyString.split(",")).map(String::trim).collect(Collectors.toList());
}

public NestedKeyNullStrategy fieldNestedUpdateAggNestedKeyNullStrategy(String fieldName) {
return options.get(
key(FIELDS_PREFIX + "." + fieldName + "." + NESTED_KEY_NULL_STRATEGY)
.enumType(NestedKeyNullStrategy.class)
.noDefaultValue());
}

public List<String> fieldNestedUpdateAggNestedSequenceField(String fieldName) {
String keyString =
options.get(
Expand Down Expand Up @@ -5046,4 +5055,37 @@ public InlineElement getDescription() {
return text(description);
}
}

/** Strategy for handling rows whose nested-key contains null values. */
public enum NestedKeyNullStrategy implements DescribedEnum {
MERGE(
"merge",
"Merge rows even if the nested-key contains null values, without enforcing primary key semantics."),

IGNORE(
"ignore",
"Ignore rows whose nested-key contains null values because they do not satisfy primary key semantics."),

ERROR(
"error",
"Throw an exception if the nested-key contains null values, because primary key fields must not be null.");

private final String value;
private final String description;

NestedKeyNullStrategy(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public InlineElement getDescription() {
return text(description);
}

@Override
public String toString() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.codegen.RecordEqualiser;
Expand All @@ -32,7 +33,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -61,17 +62,14 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
@Nullable private final RecordComparator sequenceComparator;
private final boolean hasSequenceField;

private final CoreOptions.NestedKeyNullStrategy nestedKeyNullStrategy;
private final int countLimit;

public FieldNestedUpdateAgg(
String name, ArrayType dataType, List<String> nestedKey, int countLimit) {
this(name, dataType, nestedKey, Collections.emptyList(), countLimit);
}

public FieldNestedUpdateAgg(
String name,
ArrayType dataType,
List<String> nestedKey,
CoreOptions.NestedKeyNullStrategy nestedKeyNullStrategy,
List<String> nestedSequenceField,
int countLimit) {
super(name, dataType);
Expand All @@ -85,12 +83,24 @@ public FieldNestedUpdateAgg(
this.elementEqualiser = null;
}

checkArgument(
nestedKeyNullStrategy == null || this.keyProjection != null,
"Option 'fields.<field-name>.nested-key-null-strategy' requires "
+ "'fields.<field-name>.nested-key' to be configured.");

// Default to MERGE to preserve the previous behavior.
this.nestedKeyNullStrategy =
nestedKeyNullStrategy == null
? CoreOptions.NestedKeyNullStrategy.MERGE
: nestedKeyNullStrategy;

// If nestedSequenceField is set, we need to compare sequence fields to determine
// whether to update. Only update when the new sequence is greater than the old one.
if (!nestedSequenceField.isEmpty()) {
checkArgument(
this.keyProjection != null,
"nested-sequence-field requires nested-key to be set.");
"Option 'fields.<field-name>.nested-sequence-field' requires "
+ "'fields.<field-name>.nested-key' to be configured.");
this.sequenceProjection = newProjection(nestedType, nestedSequenceField);
this.hasSequenceField = true;

Expand Down Expand Up @@ -243,6 +253,11 @@ private void addNestedRows(

InternalRow row = array.getRow(i, nestedFields);
BinaryRow key = keyProjection.apply(row).copy();

if (!applyNestedKeyNullStrategy(key)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies the new strategy only while merging rows in agg(). The retract() path still builds/removes keys with keyProjection directly, so with nested-key-null-strategy=error a retraction containing a null nested key will not throw, and with ignore it can still process a null-key retraction instead of ignoring it. Both AggregateMergeFunction and PartialUpdateMergeFunction route retract records through FieldAggregator.retract(...), so the new option should be enforced there as well (or explicitly documented/tested as agg-only).

continue;
}

InternalRow existing = rows.get(key);
if (existing != null) {
if (!hasSequenceField || compareSequence(row, existing) >= 0) {
Expand All @@ -253,4 +268,27 @@ private void addNestedRows(
}
}
}

private boolean applyNestedKeyNullStrategy(BinaryRow key) {
if (!key.anyNull()) {
// The nested-key satisfies primary key semantics.
return true;
}
switch (nestedKeyNullStrategy) {
case MERGE:
// Preserve the previous behavior.
return true;
case IGNORE:
return false;
case ERROR:
throw new IllegalArgumentException(
"Nested key contains null values. Primary key fields must not be null.");
default:
throw new UnsupportedOperationException(
String.format(
"Unsupported nested-key-null-strategy '%s'. Supported values are: %s.",
nestedKeyNullStrategy,
Arrays.toString(CoreOptions.NestedKeyNullStrategy.values())));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public FieldNestedUpdateAgg create(DataType fieldType, CoreOptions options, Stri
return createFieldNestedUpdateAgg(
fieldType,
options.fieldNestedUpdateAggNestedKey(field),
options.fieldNestedUpdateAggNestedKeyNullStrategy(field),
options.fieldNestedUpdateAggNestedSequenceField(field),
options.fieldNestedUpdateAggCountLimit(field));
}
Expand All @@ -51,6 +52,7 @@ public String identifier() {
private FieldNestedUpdateAgg createFieldNestedUpdateAgg(
DataType fieldType,
List<String> nestedKey,
CoreOptions.NestedKeyNullStrategy nestedKeyNullStrategy,
List<String> nestedSequenceField,
int countLimit) {
if (nestedKey == null) {
Expand All @@ -68,6 +70,11 @@ private FieldNestedUpdateAgg createFieldNestedUpdateAgg(
checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType);

return new FieldNestedUpdateAgg(
identifier(), arrayType, nestedKey, nestedSequenceField, countLimit);
identifier(),
arrayType,
nestedKey,
nestedKeyNullStrategy,
nestedSequenceField,
countLimit);
}
}
Loading
Loading