Skip to content

PPL patterns command (BRAIN + SIMPLE) on the analytics-engine route#21797

Merged
mch2 merged 29 commits into
opensearch-project:mainfrom
ahkcs:feat/patterns-brain-datafusion
May 28, 2026
Merged

PPL patterns command (BRAIN + SIMPLE) on the analytics-engine route#21797
mch2 merged 29 commits into
opensearch-project:mainfrom
ahkcs:feat/patterns-brain-datafusion

Conversation

@ahkcs
Copy link
Copy Markdown
Contributor

@ahkcs ahkcs commented May 21, 2026

What

Wires PPL patterns (BRAIN + SIMPLE methods, label + aggregation modes) through the analytics-engine route, including the OpenSearch Dashboards BRAIN-label panel query:

patterns message method=brain mode=label
| stats count() as pattern_count, take(message, 1) as sample_logs by patterns_field
| sort - pattern_count
| fields patterns_field, pattern_count, sample_logs

Companion to opensearch-project/sql#5467 (UnifiedQueryContext PATTERN_* defaults + dashboard IT + planner unit test). Both PRs are required.

Changes

Rust / DataFusion (sandbox/plugins/analytics-backend-datafusion)

  • BRAIN classifier ported to Rust (scaffold → full classifier → scalar eval functions → PATTERN_PARSER UDF wiring).
  • internal_pattern window UDF (BRAIN label mode) + aggregate UDF (BRAIN aggregation mode).
  • Per-field scalar UDFs pattern_parser_get_pattern / pattern_parser_get_tokens — workaround for DataFusion's substrait consumer's "Direct reference StructField with child not supported".
  • SAFE_CAST / ITEM overloads for ARRAY + MAP returns; FORCE_NULLABLE on LOCAL_TAKE_OP / LOCAL_ARRAY_AGG_OP.

Analytics-engine planner (sandbox/plugins/analytics-engine)

  • PplWindowCallRewriter — bottom-up RelNode shuttle that retypes RexInputRefs before copy() so the BRAIN window-UDF result type propagates through enclosing Projects.
  • OpenSearchAggregateSplitRule — skip PARTIAL/FINAL split when STATE_EXPANDING aggregates (TAKE/FIRST/LAST/LIST/VALUES) are present (avoids argList-shift crash on take(field, 1) in stats).
  • ArrowValues.toJavaValue — recursively unwrap Arrow Text so nested Map<String, List<String>> returns hand the SQL layer pure Java types.

DataFusion regexp_replace adapter (RegexpReplaceAdapter)

  • Append "g" flag to every 3-arg REGEXP_REPLACE going to DataFusion. DataFusion's regexp_replace defaults to first-match-only; Calcite's 3-arg form is already replace-all. The append preserves PPL contract on both backends — no SQL-core knowledge of DataFusion semantics required (companion change in feat(ppl): wire patterns command for analytics-engine dashboard route sql#5467 keeps SQL core on plain 3-arg).

Capabilities + substrait wiring

  • WindowFunction.PATTERN, AggregateFunction.PATTERN.
  • opensearch_window_functions.yaml registers internal_pattern window-UDF variants.
  • DataFusionFragmentConvertor operator declarations + ADDITIONAL_*_SIGS for isthmus binding.

Results

Suite Before After
CalcitePPLDashboardPatternsIT (new) 1 / 1
CalcitePPLPatternsIT (analytics-engine route) 3 / 15 10 / 15

Remaining 5 / 15: 3 × BrainAggregationMode_* (planner needs LogicalCorrelate post-aggregate support) + 2 × Brain*Mode_ShowNumberedToken (substrait type wiring for _ShowNumberedToken). Tracked separately, none affect the dashboard query.

@ahkcs ahkcs requested a review from a team as a code owner May 21, 2026 23:50
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 21, 2026

PR Reviewer Guide 🔍

(Review updated until commit 216ae28)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The adapter now unconditionally appends a "g" flag for 3-arg REGEXP_REPLACE calls, switching them to the 4-arg PG form. If the original pattern or replacement already contained logic that assumed single-match semantics (e.g., a user explicitly avoiding global replacement), this change silently alters behavior. The Java path did not append "g" by default; this divergence may break queries that relied on single-match semantics.

boolean appendGlobalFlag = original.getOperator() == SqlLibraryOperators.REGEXP_REPLACE_3 && original.getOperands().size() == 3;

if (rewrittenPattern == null && rewrittenReplacement == null && !appendGlobalFlag) {
    return original;
}

RexBuilder rexBuilder = cluster.getRexBuilder();
// makeLiteral(String) sizes CHAR to the new value; reusing original type would right-pad.
List<RexNode> newOperands = new ArrayList<>(original.getOperands().size() + (appendGlobalFlag ? 1 : 0));
newOperands.add(original.getOperands().get(0));
newOperands.add(rewrittenPattern != null ? rexBuilder.makeLiteral(rewrittenPattern) : patternOperand);
newOperands.add(rewrittenReplacement != null ? rexBuilder.makeLiteral(rewrittenReplacement) : replacementOperand);
for (int i = 3; i < original.getOperands().size(); i++) {
    newOperands.add(original.getOperands().get(i));
}
if (appendGlobalFlag) {
    newOperands.add(rexBuilder.makeLiteral("g", rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), true));
    return rexBuilder.makeCall(original.getType(), SqlLibraryOperators.REGEXP_REPLACE_PG_4, newOperands);
}
Possible Issue

The new isStateExpanding check skips the PARTIAL/FINAL split for any state-expanding aggregate, but the method silently returns false if fromSqlAggFunction throws. This means an unrecognized aggregate operator (e.g., a custom UDF with SqlKind.OTHER) will bypass the split without logging or error, potentially causing incorrect execution when the aggregate actually requires single-stage semantics.

private static boolean isStateExpanding(SqlAggFunction op) {
    try {
        return AggregateFunction.fromSqlAggFunction(op).getType() == AggregateFunction.Type.STATE_EXPANDING;
    } catch (IllegalStateException ignored) {
        return false;
    }
}
Possible Issue

The PATTERN case constructs an explicit return type via internalPatternReturnType, but if the aggregate call's input schema changes (e.g., a column is renamed or reordered), the hardcoded field names ("pattern", "pattern_count", "tokens", "sample_logs") may not align with the actual runtime schema. The rewriter does not validate that the input matches the expected shape, so a schema mismatch will surface as a runtime Arrow type error rather than a planning-time failure.

case "PATTERN" -> {
    // PPL declares ARRAY<MAP<VARCHAR, ANY>>; substrait can't carry ANY.
    targetOp = DataFusionFragmentConvertor.LOCAL_INTERNAL_PATTERN_OP;
    explicitReturnType = internalPatternReturnType(agg.getCluster().getTypeFactory());
}
Possible Issue

The tryRewritePatternParserAccess method assumes that a 3-arg pattern_parser call (BRAIN label mode) always has the window result at operand index 1. If the planner or a future rewrite changes the operand order, the method will silently extract the wrong operand and produce incorrect results. No validation confirms that operand 1 is actually the window result.

List<RexNode> ppArgs = innerCall.getOperands();
if (ppArgs.size() == 3) {
    RexNode source = ppArgs.get(0);
    RexNode windowResult = ppArgs.get(1);
    return switch (fieldName) {
        case "pattern" -> windowResult;
        case "tokens" -> rexBuilder.makeCall(
            DataFusionFragmentConvertor.LOCAL_PATTERN_PARSER_GET_TOKENS_OP,
            List.of(windowResult, source)
        );
        default -> null;
    };
Possible Issue

The descendAndRewrite method checks inputRowTypeChanged by comparing row types after child rewriting, but it does not account for cases where the row type's field count matches but the field types differ in nullability or precision. If a child rewrite changes a field from nullable to non-nullable (or vice versa), the equality check may pass, and the parent's RexInputRefs will reference stale types, causing substrait conversion to fail with a type mismatch.

    for (int i = 0; i < oldInputs.size(); i++) {
        if (!oldInputs.get(i).getRowType().equals(newInputs.get(i).getRowType())) {
            inputRowTypeChanged = true;
            break;
        }
    }
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 21, 2026

PR Code Suggestions ✨

Latest suggestions up to 216ae28

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix ambiguous optional parameter parsing

The argument parsing logic for optional parameters at indices 4 and 5 has ambiguous
type detection that could misinterpret arguments. If both parameters are provided,
the code tries f64 first then i64 for each position, which could incorrectly assign
values if types don't match expectations. Use explicit positional semantics based on
the documented argument order.

sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs [207-220]

 fn from_args(values: &[ArrayRef]) -> Self {
     ...
     if let Some(arr) = values.get(4) {
         if let Some(v) = read_f64_at(arr, 0) {
             cfg.threshold_percentage = v;
-        } else if let Some(v) = read_i64_at(arr, 0) {
-            cfg.variable_count_threshold = v.max(0) as usize;
         }
     }
     if let Some(arr) = values.get(5) {
         if let Some(v) = read_i64_at(arr, 0) {
             cfg.variable_count_threshold = v.max(0) as usize;
-        } else if let Some(v) = read_f64_at(arr, 0) {
-            cfg.threshold_percentage = v;
         }
     }
     cfg
 }
Suggestion importance[1-10]: 8

__

Why: The current logic tries both f64 and i64 for each position, which could incorrectly assign values if the type order doesn't match expectations. The improved code enforces strict positional semantics matching the documented argument order (arg 4 = threshold_percentage as f64, arg 5 = variable_count_threshold as i64), preventing potential type confusion bugs.

Medium
General
Handle UTF-8 boundaries in substring search

The bytewise substring search assumes ASCII input but the comment acknowledges
non-ASCII would need char boundary handling. For UTF-8 strings with multi-byte
characters, slicing at arbitrary byte offsets can panic or produce invalid results.
Use str::find on the original string slices instead of byte-level comparison.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/utils.rs [183-191]

-fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
-    if needle.is_empty() {
-        return Some(0);
-    }
-    if needle.len() > haystack.len() {
-        return None;
-    }
-    (0..=haystack.len() - needle.len()).find(|&i| &haystack[i..i + needle.len()] == needle)
+fn find_subsequence_str(haystack: &str, needle: &str) -> Option<usize> {
+    haystack.find(needle)
 }
Suggestion importance[1-10]: 7

__

Why: The bytewise search can panic or produce incorrect results with multi-byte UTF-8 characters. While the comment acknowledges this limitation and mentions ASCII test data, using str::find is safer and more idiomatic for production code that may encounter non-ASCII input.

Medium
Filter nulls from pattern corpus

Using unwrap_or_default() silently converts null values to empty strings, which may
skew pattern detection by treating missing data as valid empty input. Consider
filtering out nulls or handling them explicitly to avoid polluting the BRAIN corpus
with artificial empty entries.

sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs [136-139]

 let mut messages: Vec<String> = Vec::with_capacity(num_rows);
 for i in 0..num_rows {
-    messages.push(read_string_at(field_col, i).unwrap_or_default());
+    if let Some(msg) = read_string_at(field_col, i) {
+        if !msg.is_empty() {
+            messages.push(msg);
+        }
+    }
 }
Suggestion importance[1-10]: 6

__

Why: Converting nulls to empty strings via unwrap_or_default() may skew BRAIN pattern detection by treating missing data as valid empty input. Filtering out nulls and empty strings ensures the corpus only contains meaningful data, improving pattern quality.

Low
Use sorted slice head for max frequency

The find_candidate function computes max_freq by iterating the entire slice, then
iterates again to find the first entry above the threshold. Since the slice is
already sorted by (same_freq_count desc, word_freq desc), the maximum word_freq is
the first element's word_freq. Use sorted[0].word_freq directly to avoid the
redundant iteration.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/brain.rs [229-239]

 fn find_candidate(sorted: &[WordCombination], threshold_percentage: f64) -> WordCombination {
     assert!(!sorted.is_empty(), "Sorted word combinations must be non-empty");
-    let max_freq = sorted.iter().map(|w| w.word_freq).max().unwrap_or(0);
+    let max_freq = sorted[0].word_freq;
     let threshold = (max_freq as f64) * threshold_percentage;
     for w in sorted {
         if (w.word_freq as f64) > threshold {
             return *w;
         }
     }
     sorted[0]
 }
Suggestion importance[1-10]: 5

__

Why: The slice is sorted by (same_freq_count desc, word_freq desc), so sorted[0].word_freq is not necessarily the maximum word_freq across all elements (a later element with lower same_freq_count could have higher word_freq). The suggestion's premise is incorrect, but the redundant iteration could be avoided by tracking max during the sort. Score reflects the valid observation of redundancy but incorrect fix.

Low
Reserve capacity before bulk string append

The append_strings method allocates a new String for every non-null cell via
to_string(). For large log corpora this creates memory pressure. Consider using
strs.value(i) directly (which returns &str) and only allocating when the buffer is
serialized, or use a Cow to defer cloning until necessary.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/internal_pattern.rs [323-330]

 fn append_strings(&mut self, arr: &ArrayRef) -> Result<()> {
     if let Some(strs) = arr.as_any().downcast_ref::<StringArray>() {
+        self.buffer.reserve(strs.len());
         for i in 0..strs.len() {
             if !strs.is_null(i) {
                 self.buffer.push(strs.value(i).to_string());
             }
         }
         return Ok(());
     }
     ...
 }
Suggestion importance[1-10]: 4

__

Why: Adding self.buffer.reserve(strs.len()) before the loop reduces reallocations when appending many strings. The improvement is minor (allocation overhead vs. per-element push) but valid for large log corpora.

Low

Previous suggestions

Suggestions up to commit 4c623c3
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix reversed type fallback logic

The argument parsing logic attempts to handle both f64 and i64 types for args 4 and
5, but the fallback branches are reversed. According to the comment, arg 4 should be
frequency_threshold_percentage (f64) and arg 5 should be variable_count_threshold
(i64). The else if branches incorrectly assign the wrong type to the wrong field,
causing silent type mismatches.

sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs [207-220]

 if let Some(arr) = values.get(4) {
     if let Some(v) = read_f64_at(arr, 0) {
         cfg.threshold_percentage = v;
-    } else if let Some(v) = read_i64_at(arr, 0) {
-        cfg.variable_count_threshold = v.max(0) as usize;
     }
 }
 if let Some(arr) = values.get(5) {
     if let Some(v) = read_i64_at(arr, 0) {
         cfg.variable_count_threshold = v.max(0) as usize;
-    } else if let Some(v) = read_f64_at(arr, 0) {
-        cfg.threshold_percentage = v;
     }
 }
Suggestion importance[1-10]: 9

__

Why: The fallback branches in the argument parsing logic are incorrectly reversed. When read_f64_at fails for arg 4, it tries to read it as i64 and assign to variable_count_threshold, which contradicts the comment stating arg 4 is frequency_threshold_percentage. This is a critical logic error that will cause silent type mismatches and incorrect configuration.

High
Prevent duplicate global flag append

The global flag append logic should verify that the 4-arg form doesn't already
contain a "g" flag before appending. When a 3-arg call is rewritten to 4-arg with
"g", a subsequent pass through the adapter would append another "g", producing "gg".

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/RegexpReplaceAdapter.java [59-62]

 boolean appendGlobalFlag = original.getOperator() == SqlLibraryOperators.REGEXP_REPLACE_3 && original.getOperands().size() == 3;
 
 if (rewrittenPattern == null && rewrittenReplacement == null && !appendGlobalFlag) {
     return original;
 }
 
+// Prevent duplicate "g" flag on re-entry
+if (original.getOperator() == SqlLibraryOperators.REGEXP_REPLACE_PG_4 && original.getOperands().size() == 4) {
+    RexNode flagsOperand = original.getOperands().get(3);
+    if (flagsOperand instanceof RexLiteral) {
+        String flags = ((RexLiteral) flagsOperand).getValueAs(String.class);
+        if (flags != null && flags.contains("g")) {
+            appendGlobalFlag = false;
+        }
+    }
+}
+
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that re-entry into the adapter after a 3-arg→4-arg rewrite would append another "g" flag, producing "gg". The fix adds a guard to check if the 4-arg form already contains "g" before setting appendGlobalFlag, preventing duplicate flags.

Medium
Validate aggregate argument index bounds

The arg0Type extraction assumes call.getArgList().get(0) is a valid input column
index, but aggregate calls can reference columns beyond the input row type when the
aggregate sits above a Project that computes expressions. Verify the index is within
bounds before accessing getFieldList().

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java [79-98]

 case "LIST", "VALUES" -> {
-    // arg0 type distinguishes PARTIAL (raw element → array_agg) from FINAL (array → list_merge).
     if (call.getArgList().isEmpty()) {
         return call;
     }
-    RelDataType arg0Type = agg.getInput().getRowType().getFieldList().get(call.getArgList().get(0)).getType();
+    int arg0Index = call.getArgList().get(0);
+    List<RelDataTypeField> inputFields = agg.getInput().getRowType().getFieldList();
+    if (arg0Index < 0 || arg0Index >= inputFields.size()) {
+        return call;
+    }
+    RelDataType arg0Type = inputFields.get(arg0Index).getType();
     boolean isFinal = arg0Type.getSqlTypeName() == SqlTypeName.ARRAY;
     if (isFinal) {
         targetOp = "VALUES".equals(name) ? DataFusionFragmentConvertor.LOCAL_LIST_MERGE_DISTINCT_OP
             : DataFusionFragmentConvertor.LOCAL_LIST_MERGE_OP;
     } else {
         targetOp = DataFusionFragmentConvertor.LOCAL_ARRAY_AGG_OP;
         forceDistinct = "VALUES".equals(name);
         explicitReturnType = agg.getCluster().getTypeFactory().createArrayType(arg0Type, -1);
     }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly notes that call.getArgList().get(0) could reference an out-of-bounds column index when the aggregate sits above a Project. Adding bounds validation prevents potential IndexOutOfBoundsException at runtime.

Medium
General
Use first element for max frequency

The max_freq calculation iterates the entire slice even though sorted is already
ordered by word_freq descending (after same_freq_count). The maximum word_freq is
the first element's word_freq, making the iteration redundant.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/brain.rs [229-239]

 fn find_candidate(sorted: &[WordCombination], threshold_percentage: f64) -> WordCombination {
     assert!(!sorted.is_empty(), "Sorted word combinations must be non-empty");
-    let max_freq = sorted.iter().map(|w| w.word_freq).max().unwrap_or(0);
+    let max_freq = sorted[0].word_freq;
     let threshold = (max_freq as f64) * threshold_percentage;
     for w in sorted {
         if (w.word_freq as f64) > threshold {
             return *w;
         }
     }
     sorted[0]
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly observes that sorted is ordered by word_freq descending (after same_freq_count), so sorted[0].word_freq is the maximum. Replacing the iter().map().max() with direct access eliminates redundant iteration, improving efficiency.

Low
Filter nulls from pattern corpus

Using unwrap_or_default() silently converts null values to empty strings, which may
skew pattern detection by treating missing data as valid empty input. Consider
filtering out null rows or handling them explicitly to avoid polluting the BRAIN
corpus with artificial empty-string patterns.

sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs [136-139]

 let mut messages: Vec<String> = Vec::with_capacity(num_rows);
 for i in 0..num_rows {
-    messages.push(read_string_at(field_col, i).unwrap_or_default());
+    if let Some(msg) = read_string_at(field_col, i) {
+        messages.push(msg);
+    }
 }
Suggestion importance[1-10]: 6

__

Why: Converting null values to empty strings with unwrap_or_default() may affect pattern detection quality by including artificial empty entries in the corpus. However, the downstream code at line 161-163 already handles empty strings explicitly, so the impact is mitigated. Filtering nulls would be cleaner but not critical.

Low
Pre-allocate buffer capacity for batch

The append_strings method allocates a new String for every non-null row via
to_string(). For large datasets this creates significant memory pressure. Use
strs.value(i) directly (which returns &str) and only allocate when pushing to the
buffer.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/internal_pattern.rs [323-332]

 fn append_strings(&mut self, arr: &ArrayRef) -> Result<()> {
     if let Some(strs) = arr.as_any().downcast_ref::<StringArray>() {
+        self.buffer.reserve(strs.len());
         for i in 0..strs.len() {
             if !strs.is_null(i) {
                 self.buffer.push(strs.value(i).to_string());
             }
         }
         return Ok(());
     }
     ...
     exec_err!("{}: expected string column, got {:?}", NAME, arr.data_type())
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion adds self.buffer.reserve(strs.len()) to pre-allocate capacity before pushing strings, reducing reallocation overhead. While correct, the impact is moderate—Vec::push already amortizes growth, so the improvement is incremental rather than critical.

Low
Use safer window-based substring search

The bytewise substring search assumes ASCII input but may produce incorrect offsets
for UTF-8 strings containing multi-byte characters. When original contains non-ASCII
characters, byte offsets won't align with character boundaries, causing
&original[pos..absolute] to panic or produce invalid UTF-8 slices.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/utils.rs [183-191]

 fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
     if needle.is_empty() {
         return Some(0);
     }
     if needle.len() > haystack.len() {
         return None;
     }
-    (0..=haystack.len() - needle.len()).find(|&i| &haystack[i..i + needle.len()] == needle)
+    haystack.windows(needle.len()).position(|window| window == needle)
 }
Suggestion importance[1-10]: 3

__

Why: While the suggested windows().position() approach is cleaner, the existing implementation already handles the same logic correctly. The comment in the code acknowledges the ASCII limitation, and the improvement is primarily stylistic rather than fixing a bug.

Low
Suggestions up to commit 87a268c
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix ambiguous optional parameter parsing

The argument parsing logic for optional parameters at indices 4 and 5 has ambiguous
type detection that could assign values to wrong fields. If both parameters are
provided, the code tries float first then int for each position, which doesn't match
the documented ordering where arg 4 should be frequency_threshold_percentage
(double) and arg 5 should be variable_count_threshold (int).

sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs [207-220]

 fn from_args(values: &[ArrayRef]) -> Self {
     ...
     if let Some(arr) = values.get(4) {
         if let Some(v) = read_f64_at(arr, 0) {
             cfg.threshold_percentage = v;
-        } else if let Some(v) = read_i64_at(arr, 0) {
-            cfg.variable_count_threshold = v.max(0) as usize;
         }
     }
     if let Some(arr) = values.get(5) {
         if let Some(v) = read_i64_at(arr, 0) {
             cfg.variable_count_threshold = v.max(0) as usize;
-        } else if let Some(v) = read_f64_at(arr, 0) {
-            cfg.threshold_percentage = v;
         }
     }
     cfg
 }
Suggestion importance[1-10]: 8

__

Why: The current implementation has ambiguous type detection that tries both float and int for each position, which could incorrectly assign values when both optional parameters are provided. The fix ensures arg 4 is always treated as threshold_percentage (f64) and arg 5 as variable_count_threshold (i64), matching the documented parameter ordering.

Medium
Reject negative values in scalar conversion

The signed-to-unsigned cast doesn't check for negative values before converting to
usize. A negative literal (e.g. -1 for max_sample_count) would wrap to a large
positive value, causing memory allocation issues or incorrect behavior.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/internal_pattern.rs [273-285]

 fn scalar_to_usize(scalar: &ScalarValue) -> Result<usize> {
     match scalar {
-        ScalarValue::Int8(Some(v)) => Ok(*v as usize),
-        ScalarValue::Int16(Some(v)) => Ok(*v as usize),
-        ScalarValue::Int32(Some(v)) => Ok(*v as usize),
-        ScalarValue::Int64(Some(v)) => Ok(*v as usize),
-        ...
+        ScalarValue::Int8(Some(v)) if *v >= 0 => Ok(*v as usize),
+        ScalarValue::Int16(Some(v)) if *v >= 0 => Ok(*v as usize),
+        ScalarValue::Int32(Some(v)) if *v >= 0 => Ok(*v as usize),
+        ScalarValue::Int64(Some(v)) if *v >= 0 => Ok(*v as usize),
+        ScalarValue::UInt8(Some(v)) => Ok(*v as usize),
+        ScalarValue::UInt16(Some(v)) => Ok(*v as usize),
+        ScalarValue::UInt32(Some(v)) => Ok(*v as usize),
+        ScalarValue::UInt64(Some(v)) => Ok(*v as usize),
+        _ => exec_err!("{}: expected non-negative integer literal, got {scalar:?}", NAME),
     }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a real issue where negative literals would wrap to large positive usize values, potentially causing memory allocation failures or incorrect behavior. The proposed fix adds necessary bounds checking to reject negative inputs, improving robustness.

Medium
General
Use safer substring search method

The bytewise substring search assumes ASCII input but the comment acknowledges
non-ASCII would need char boundary handling. For UTF-8 strings with multi-byte
characters, slicing at arbitrary byte offsets can panic or produce invalid strings
when converting back to &str.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/utils.rs [183-191]

 fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
     if needle.is_empty() {
         return Some(0);
     }
     if needle.len() > haystack.len() {
         return None;
     }
-    (0..=haystack.len() - needle.len()).find(|&i| &haystack[i..i + needle.len()] == needle)
+    haystack
+        .windows(needle.len())
+        .position(|window| window == needle)
 }
Suggestion importance[1-10]: 6

__

Why: The suggested windows().position() approach is cleaner and more idiomatic Rust, though functionally equivalent to the existing implementation for the byte-level search. The improvement is primarily in code clarity and maintainability rather than correctness, as both handle the ASCII case correctly.

Low
Validate operand structure before access

The rewrite logic assumes pattern_parser calls always have the expected operand
structure, but doesn't validate the operand count or types before accessing them. If
a malformed call reaches this point, the code may throw an IndexOutOfBoundsException
or ClassCastException.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/ItemTypeRebuilder.java [62-84]

 private RexNode tryRewritePatternParserAccess(RexCall call, List<RexNode> operands) {
     RexCall innerCall;
     RexNode keyNode;
     if (call.getKind() == org.apache.calcite.sql.SqlKind.ITEM
         && operands.size() == 2
         && operands.get(0) instanceof RexCall ppDirect
-        && "pattern_parser".equalsIgnoreCase(ppDirect.getOperator().getName())) {
+        && "pattern_parser".equalsIgnoreCase(ppDirect.getOperator().getName())
+        && ppDirect.getOperands().size() >= 2) {
         innerCall = ppDirect;
         keyNode = operands.get(1);
     } else if ("array_element".equalsIgnoreCase(call.getOperator().getName())
+        && operands.size() == 2
+        && operands.get(0) instanceof RexCall mapExtract
+        && "map_extract".equalsIgnoreCase(mapExtract.getOperator().getName())
+        && mapExtract.getOperands().size() == 2
         ...
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies a potential IndexOutOfBoundsException risk when accessing ppDirect.getOperands() without validating its size. Adding && ppDirect.getOperands().size() >= 2 would prevent crashes on malformed calls, though the existing code likely never encounters such cases in practice.

Low
Validate array element type compatibility

The FINAL-phase detection relies on arg0Type being ARRAY, but the code doesn't
verify that the array's element type is compatible with the expected state shape. If
the input column is an unrelated array type, the rewrite may emit an incorrect
operator or return type.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java [79-98]

 case "LIST", "VALUES" -> {
     if (call.getArgList().isEmpty()) {
         return call;
     }
     int arg0Index = call.getArgList().get(0);
     if (arg0Index < 0 || arg0Index >= agg.getInput().getRowType().getFieldCount()) {
         return call;
     }
     RelDataType arg0Type = agg.getInput().getRowType().getFieldList().get(arg0Index).getType();
     boolean isFinal = arg0Type.getSqlTypeName() == SqlTypeName.ARRAY;
-    ...
+    if (isFinal) {
+        targetOp = "VALUES".equalsIgnoreCase(name) ? DataFusionFragmentConvertor.LOCAL_LIST_MERGE_DISTINCT_OP
+            : DataFusionFragmentConvertor.LOCAL_LIST_MERGE_OP;
+    } else {
+        targetOp = DataFusionFragmentConvertor.LOCAL_ARRAY_AGG_OP;
+        explicitReturnType = agg.getCluster().getTypeFactory().createArrayType(arg0Type, -1);
+    }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that the code doesn't validate the array element type before assuming it's a FINAL-phase state, but the improved_code snippet is identical to the existing_code and doesn't add any validation. The concern is valid but the proposed fix doesn't address it.

Low
Ensure sample size consistency

When max_sample_count is smaller than the partition size, BRAIN only processes a
subset of messages but eval_agg is called on all rows. This could result in poor
pattern matches for rows not included in the sample corpus, especially if the
partition has diverse log formats.

sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs [142-152]

-let stats = parser.parse_all_log_patterns(&messages, cfg.max_sample_count);
+let sample_size = cfg.max_sample_count.min(num_rows);
+let sampled_messages: Vec<String> = messages.iter().take(sample_size).cloned().collect();
+let stats = parser.parse_all_log_patterns(&sampled_messages, sample_size);
 let candidates: Vec<AggCandidate> = stats
     .values()
     .map(|entry: &PatternEntry| AggCandidate {
         pattern: entry.pattern.clone(),
     })
     .collect();
Suggestion importance[1-10]: 4

__

Why: While the suggestion correctly identifies that max_sample_count may be smaller than num_rows, the proposed change doesn't address the core concern. The BRAIN parser already handles sampling internally via its max_sample_count parameter, so explicitly limiting the input messages would be redundant and potentially alter the algorithm's behavior.

Low
Separate no-op and flag-append paths

The global flag append logic is unconditional for all 3-arg calls, but the adapter
should only append "g" when the pattern/replacement rewrite didn't already switch to
the 4-arg form. If a rewrite occurred and the operator is still REGEXP_REPLACE_3,
the flag append is correct; otherwise the condition may cause double-appending or
incorrect operator selection.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/RegexpReplaceAdapter.java [59-62]

-boolean appendGlobalFlag = original.getOperator() == SqlLibraryOperators.REGEXP_REPLACE_3 && original.getOperands().size() == 3;
-
-if (rewrittenPattern == null && rewrittenReplacement == null && !appendGlobalFlag) {
+if (rewrittenPattern == null && rewrittenReplacement == null) {
+    if (original.getOperator() == SqlLibraryOperators.REGEXP_REPLACE_3 && original.getOperands().size() == 3) {
+        List<RexNode> newOperands = new ArrayList<>(original.getOperands());
+        newOperands.add(rexBuilder.makeLiteral("g", rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), true));
+        return rexBuilder.makeCall(original.getType(), SqlLibraryOperators.REGEXP_REPLACE_PG_4, newOperands);
+    }
     return original;
 }
 
+boolean appendGlobalFlag = original.getOperator() == SqlLibraryOperators.REGEXP_REPLACE_3 && original.getOperands().size() == 3;
+// ... rest of the rewrite logic
+
Suggestion importance[1-10]: 3

__

Why: The suggestion identifies a potential logic issue where the flag-append path might interact incorrectly with the rewrite logic, but the existing code already handles this correctly by checking rewrittenPattern == null && rewrittenReplacement == null && !appendGlobalFlag before returning the original. The suggested refactoring would duplicate logic without fixing an actual bug.

Low
Suggestions up to commit 6653bde
CategorySuggestion                                                                                                                                    Impact
Possible issue
Advance position past matched delimiter

After finding the next static delimiter, pos is updated to absolute (the start of
the delimiter) but doesn't advance past it. This causes the subsequent static-part
check to fail because original[pos..] still starts with the delimiter. The position
should advance by the delimiter's length.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/utils.rs [160-168]

-let search_region = &original_bytes[pos..];
-let next_static_bytes = next_static.as_bytes();
 match find_subsequence(search_region, next_static_bytes) {
     None => return,
     Some(rel_idx) => {
         let absolute = pos + rel_idx;
         let value = &original[pos..absolute];
         add_to_result(result, token_key, value.to_string());
-        pos = absolute;
-        i += 1;
+        pos = absolute + next_static.len();
+        i += 2;
     }
 }
Suggestion importance[1-10]: 9

__

Why: Critical bug: after finding a delimiter, pos is set to the delimiter's start but doesn't advance past it. This causes the next static-part check to fail. The fix correctly advances pos by the delimiter length and skips the next iteration (i += 2), which is essential for proper pattern matching.

High
General
Add bounds check for token_order access

The function accesses token_order[token_index] without verifying that token_index is
within bounds. If the is_token and token_order vectors become desynchronized (e.g.,
due to a bug in parse_pattern), this could panic. Add a bounds check before
accessing token_order.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/utils.rs [144-146]

-pub fn extract_variables(
-    parse_result: &ParseResult,
-    original: &str,
-    result: &mut TokensMap,
-    _prefix: &str,
-) {
+if is_token[i] {
+    if token_index >= token_order.len() {
+        return;
+    }
+    let token_key = &token_order[token_index];
+    token_index += 1;
     ...
-    while i < parts.len() {
-        let current_part = &parts[i];
-        if is_token[i] {
-            let token_key = &token_order[token_index];
-            token_index += 1;
-            if i == parts.len() - 1 {
-                ...
-            } else {
-                let next_static = &parts[i + 1];
-                ...
-            }
-        } else {
-            if original[pos..].starts_with(current_part) {
-                pos += current_part.len();
-                i += 1;
-            } else {
-                return;
-            }
-        }
-    }
 }
Suggestion importance[1-10]: 8

__

Why: Accessing token_order[token_index] without bounds checking could cause a panic if the vectors become desynchronized. This is a potential runtime safety issue that should be prevented with a bounds check before array access.

Medium
Remove ambiguous type fallback logic

The argument parsing logic for optional parameters at indices 4 and 5 attempts to
handle both f64 and i64 types in either position, but this creates ambiguity.
According to the comment, arg 4 should be frequency_threshold_percentage (double)
and arg 5 should be variable_count_threshold (int). The fallback logic could
misinterpret types if the wrong type appears at the wrong index.

sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs [207-220]

 fn from_args(values: &[ArrayRef]) -> Self {
     ...
     if let Some(arr) = values.get(4) {
         if let Some(v) = read_f64_at(arr, 0) {
             cfg.threshold_percentage = v;
-        } else if let Some(v) = read_i64_at(arr, 0) {
-            cfg.variable_count_threshold = v.max(0) as usize;
         }
     }
     if let Some(arr) = values.get(5) {
         if let Some(v) = read_i64_at(arr, 0) {
             cfg.variable_count_threshold = v.max(0) as usize;
-        } else if let Some(v) = read_f64_at(arr, 0) {
-            cfg.threshold_percentage = v;
         }
     }
     cfg
 }
Suggestion importance[1-10]: 7

__

Why: The current implementation has ambiguous fallback logic that could misinterpret argument types. The comment clearly states arg 4 should be frequency_threshold_percentage (f64) and arg 5 should be variable_count_threshold (i64), but the code tries both types at both positions. Removing the fallback improves correctness and clarity.

Medium
Pre-allocate buffer capacity for performance

The append_strings method allocates a new String for every non-null element via
to_string(). For large datasets this creates significant memory pressure. Consider
using strs.value(i) directly as &str and only allocating when necessary, or
pre-allocating the buffer capacity based on the array length.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/internal_pattern.rs [322-330]

 fn append_strings(&mut self, arr: &ArrayRef) -> Result<()> {
     if let Some(strs) = arr.as_any().downcast_ref::<StringArray>() {
+        self.buffer.reserve(strs.len());
         for i in 0..strs.len() {
             if !strs.is_null(i) {
                 self.buffer.push(strs.value(i).to_string());
             }
         }
         return Ok(());
     }
     ...
Suggestion importance[1-10]: 5

__

Why: The suggestion to pre-allocate buffer capacity via self.buffer.reserve(strs.len()) is a valid performance optimization that reduces reallocation overhead when appending many strings. However, the impact is moderate (not critical) and the suggestion doesn't address the to_string() allocation concern mentioned in the rationale.

Low
Guard against empty input corpus

The parse_all_log_patterns method processes the entire log corpus in memory without
any size limit. For very large datasets this can cause memory exhaustion. Consider
adding a configurable corpus size limit and returning an error or truncating the
input when the limit is exceeded, similar to the buffer_limit parameter mentioned in
the aggregate UDF signature.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/brain.rs [101-105]

 pub fn parse_all_log_patterns(
     &mut self,
     log_messages: &[String],
     max_sample_count: usize,
 ) -> BrainParseStats {
+    if log_messages.is_empty() {
+        return HashMap::new();
+    }
     let preprocessed = self.preprocess_all_logs(log_messages);
     ...
Suggestion importance[1-10]: 4

__

Why: Adding an empty-input guard is a sensible defensive measure that avoids unnecessary work when log_messages is empty. However, the suggestion's rationale about memory exhaustion and corpus size limits is not addressed by the "improved_code" (which only checks for empty input), making the score moderate.

Low
Validate argument list before rewrite

The PATTERN case unconditionally sets explicitReturnType but doesn't verify that the
aggregate call's argument list matches the expected shape. If the call has
unexpected arguments or types, the rewrite may produce a malformed aggregate. Add a
guard to check call.getArgList() size and types before proceeding with the rewrite.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java [99-103]

 case "PATTERN" -> {
-    // PPL declares ARRAY<MAP<VARCHAR, ANY>>; substrait can't carry ANY.
+    if (call.getArgList().isEmpty()) {
+        return call;
+    }
     targetOp = DataFusionFragmentConvertor.LOCAL_INTERNAL_PATTERN_OP;
     explicitReturnType = internalPatternReturnType(agg.getCluster().getTypeFactory());
 }
Suggestion importance[1-10]: 3

__

Why: Adding an empty-argument-list guard is a reasonable defensive check, but the suggestion doesn't address the broader concern about verifying argument types. The improvement is minor and the "improved_code" only adds a single isEmpty() check without validating the expected shape.

Low
Suggestions up to commit 111578e
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix unsafe byte-to-char casting for UTF-8

Casting raw bytes to char via as char is unsafe for non-ASCII UTF-8 sequences and
can produce invalid Unicode. Use std::str::from_utf8 or iterate over input.chars()
instead of raw bytes to preserve UTF-8 correctness and avoid potential data
corruption.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/preprocess.rs [103-125]

 fn apply_generic_number_filter(input: &str) -> String {
-    let bytes = input.as_bytes();
     let mut out = String::with_capacity(input.len());
+    let chars: Vec<char> = input.chars().collect();
     let mut i = 0;
-    while i < bytes.len() {
+    while i < chars.len() {
+        let c = chars[i];
+        let is_digit = c.is_ascii_digit() || c == '-' || c == '+';
+        if is_digit {
+            if i == 0 {
+                out.push(c);
+                i += 1;
+                continue;
+            }
+            let prev = chars[i - 1];
+            ...
+        }
         ...
-        if i == 0 {
-            out.push(c as char);
-            i += 1;
-            continue;
-        }
-        let prev = bytes[i - 1];
+    }
+    out
+}
Suggestion importance[1-10]: 9

__

Why: Casting raw bytes to char via as char is unsafe for non-ASCII UTF-8 and can produce invalid Unicode. The suggestion correctly identifies a critical correctness issue and proposes using chars() iterator to preserve UTF-8 validity, preventing potential data corruption.

High
Validate signed integer bounds before casting

Casting signed integers to usize without validation can cause integer overflow or
produce incorrect values when the input is negative. Add bounds checking to ensure
the value is non-negative and within usize range before casting to prevent runtime
errors or silent data corruption.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/internal_pattern.rs [273-285]

 fn scalar_to_usize(scalar: &ScalarValue) -> Result<usize> {
     match scalar {
-        ScalarValue::Int8(Some(v)) => Ok(*v as usize),
-        ScalarValue::Int16(Some(v)) => Ok(*v as usize),
-        ScalarValue::Int32(Some(v)) => Ok(*v as usize),
-        ScalarValue::Int64(Some(v)) => Ok(*v as usize),
+        ScalarValue::Int8(Some(v)) if *v >= 0 => Ok(*v as usize),
+        ScalarValue::Int16(Some(v)) if *v >= 0 => Ok(*v as usize),
+        ScalarValue::Int32(Some(v)) if *v >= 0 => Ok(*v as usize),
+        ScalarValue::Int64(Some(v)) if *v >= 0 => usize::try_from(*v).map_err(|_| 
+            datafusion::common::DataFusionError::Execution(format!("{}: value out of range", NAME))),
         ...
     }
 }
Suggestion importance[1-10]: 8

__

Why: Casting signed integers to usize without validation can cause integer overflow or incorrect values when negative. The suggestion correctly identifies a potential runtime error and proposes bounds checking to prevent silent data corruption or crashes.

Medium
Validate UTF-8 character boundaries

The bytewise substring search and slicing assumes ASCII-compatible input but
operates on UTF-8 byte boundaries without validation. If original contains
multi-byte UTF-8 characters, slicing at arbitrary byte offsets
(original[pos..absolute]) can panic or produce invalid UTF-8. Use
character-boundary-aware indexing or validate that slice boundaries fall on valid
UTF-8 boundaries.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/utils.rs [160-168]

-pub fn extract_variables(
-    parse_result: &ParseResult,
-    original: &str,
-    result: &mut TokensMap,
-    _prefix: &str,
-) {
-    ...
-    let original_bytes = original.as_bytes();
-    ...
-    let search_region = &original_bytes[pos..];
-    let next_static_bytes = next_static.as_bytes();
-    match find_subsequence(search_region, next_static_bytes) {
-        None => return,
-        Some(rel_idx) => {
-            let absolute = pos + rel_idx;
+match find_subsequence(search_region, next_static_bytes) {
+    None => return,
+    Some(rel_idx) => {
+        let absolute = pos + rel_idx;
+        if original.is_char_boundary(pos) && original.is_char_boundary(absolute) {
             let value = &original[pos..absolute];
-            ...
+            add_to_result(result, token_key, value.to_string());
+            pos = absolute;
+            i += 1;
+        } else {
+            return;
         }
     }
 }
Suggestion importance[1-10]: 8

__

Why: The code performs byte-level slicing on UTF-8 strings without validating character boundaries, which can cause panics or invalid UTF-8 when multi-byte characters are present. Adding is_char_boundary checks prevents runtime errors and ensures correctness for non-ASCII input.

Medium
Enforce strict parameter type checking

The argument parsing logic for optional parameters at indices 4 and 5 attempts to
handle both f64 and i64 types in either position, but the comment states they are
ordered by argument name (frequency_threshold_percentage first,
variable_count_threshold second). This ambiguous fallback logic could lead to
incorrect parameter assignment if types don't match expectations. Enforce strict
type checking per position or validate the argument order explicitly.

sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs [207-220]

 fn from_args(values: &[ArrayRef]) -> Self {
     ...
+    // arg 4: frequency_threshold_percentage (f64)
     if let Some(arr) = values.get(4) {
         if let Some(v) = read_f64_at(arr, 0) {
             cfg.threshold_percentage = v;
-        } else if let Some(v) = read_i64_at(arr, 0) {
-            cfg.variable_count_threshold = v.max(0) as usize;
         }
     }
+    // arg 5: variable_count_threshold (i64)
     if let Some(arr) = values.get(5) {
         if let Some(v) = read_i64_at(arr, 0) {
             cfg.variable_count_threshold = v.max(0) as usize;
-        } else if let Some(v) = read_f64_at(arr, 0) {
-            cfg.threshold_percentage = v;
         }
     }
     cfg
 }
Suggestion importance[1-10]: 7

__

Why: The current implementation tries both f64 and i64 for positions 4 and 5, which contradicts the comment stating they are ordered by name. This could lead to incorrect parameter assignment if the wrong type is passed. Enforcing strict type checking per position improves correctness and prevents subtle bugs.

Medium
General
Replace panic with error return

Using assert! for input validation causes a panic on invalid input, which crashes
the process. Replace with a Result return type and proper error handling so invalid
thresholds are reported gracefully without terminating execution.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/brain.rs [82-87]

-pub fn with_thresholds(variable_count_threshold: usize, threshold_percentage: f64) -> Self {
-    assert!(
-        (0.0..=1.0).contains(&threshold_percentage),
-        "threshold_percentage must be in [0.0, 1.0]"
-    );
-    ...
+pub fn with_thresholds(variable_count_threshold: usize, threshold_percentage: f64) -> Result<Self, String> {
+    if !(0.0..=1.0).contains(&threshold_percentage) {
+        return Err(format!("threshold_percentage must be in [0.0, 1.0], got {}", threshold_percentage));
+    }
+    Ok(Self {
+        variable_count_threshold,
+        threshold_percentage,
+        ...
+    })
 }
Suggestion importance[1-10]: 7

__

Why: Using assert! for input validation causes a panic that crashes the process. The suggestion correctly proposes returning a Result for graceful error handling, which is a better practice for library code. However, this is a constructor-level validation and the impact depends on usage context.

Medium
Remove redundant operand size check

The condition checks original.getOperands().size() == 3 after already validating the
size at the method entry (lines 70-72). This redundant check can be removed since
the method returns early if size is not 3 or 4. Simplify to appendGlobalFlag =
original.getOperator() == SqlLibraryOperators.REGEXP_REPLACE_3; to avoid unnecessary
computation.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/RegexpReplaceAdapter.java [59-62]

-boolean appendGlobalFlag = original.getOperator() == SqlLibraryOperators.REGEXP_REPLACE_3 && original.getOperands().size() == 3;
+boolean appendGlobalFlag = original.getOperator() == SqlLibraryOperators.REGEXP_REPLACE_3;
 
 if (rewrittenPattern == null && rewrittenReplacement == null && !appendGlobalFlag) {
     return original;
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies a redundant check since the method already validates operand size at entry (lines 31-33 in the new hunk). However, the redundancy is minor and doesn't affect correctness—it's a small optimization that improves code clarity.

Low
Validate max_sample_count before parsing

If max_sample_count is set to 0 or a very small value, parse_all_log_patterns may
produce an empty or trivial pattern set, causing all rows to receive empty pattern
strings. Validate that max_sample_count is within a reasonable range (e.g., at least
1) before invoking the parser to prevent degenerate results.

sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs [128-146]

-fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
-    if values.is_empty() {
-        return Ok(std::sync::Arc::new(StringArray::from(vec![None::<&str>; num_rows])));
-    }
-    let cfg = WindowConfig::from_args(values);
-    ...
-    let mut parser = BrainLogParser::with_thresholds(
-        cfg.variable_count_threshold,
-        cfg.threshold_percentage,
-    );
-    let stats = parser.parse_all_log_patterns(&messages, cfg.max_sample_count);
+let cfg = WindowConfig::from_args(values);
+if cfg.max_sample_count == 0 {
+    return Ok(std::sync::Arc::new(StringArray::from(vec![""; num_rows])));
+}
+...
+let mut parser = BrainLogParser::with_thresholds(
+    cfg.variable_count_threshold,
+    cfg.threshold_percentage,
+);
+let stats = parser.parse_all_log_patterns(&messages, cfg.max_sample_count);
Suggestion importance[1-10]: 5

__

Why: While validating max_sample_count is reasonable, the suggestion to return empty strings for all rows when it's 0 may not be the desired behavior. The current code already applies .max(0) when parsing the config, and the BRAIN parser likely handles edge cases. This is a minor improvement for defensive programming.

Low
Suggestions up to commit 6058798
CategorySuggestion                                                                                                                                    Impact
Possible issue
Remove ambiguous type fallback logic

The argument parsing logic for optional parameters at indices 4 and 5 attempts to
handle both types in either position, which creates ambiguity. According to the
comment, arg 4 should be frequency_threshold_percentage (f64) and arg 5 should be
variable_count_threshold (i64). The fallback type checks in each branch will cause
incorrect behavior if the types don't match expectations.

sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs [207-220]

 fn from_args(values: &[ArrayRef]) -> Self {
     ...
     if let Some(arr) = values.get(4) {
         if let Some(v) = read_f64_at(arr, 0) {
             cfg.threshold_percentage = v;
-        } else if let Some(v) = read_i64_at(arr, 0) {
-            cfg.variable_count_threshold = v.max(0) as usize;
         }
     }
     if let Some(arr) = values.get(5) {
         if let Some(v) = read_i64_at(arr, 0) {
             cfg.variable_count_threshold = v.max(0) as usize;
-        } else if let Some(v) = read_f64_at(arr, 0) {
-            cfg.threshold_percentage = v;
         }
     }
     cfg
 }
Suggestion importance[1-10]: 8

__

Why: The current implementation has ambiguous fallback logic that attempts to parse both f64 and i64 types for arguments 4 and 5, which contradicts the documented parameter order. This could lead to incorrect configuration when arguments are provided, as the wrong type check might succeed and assign values to the wrong field.

Medium
Handle UTF-8 boundaries correctly

The bytewise substring search assumes ASCII input but the comment acknowledges
non-ASCII data requires char boundary handling. For UTF-8 strings with multi-byte
characters, slicing at arbitrary byte offsets can panic or produce invalid results.
Use str::find on the UTF-8 string instead of byte-level search to ensure
correctness.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/utils.rs [183-191]

-fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
-    if needle.is_empty() {
-        return Some(0);
-    }
-    if needle.len() > haystack.len() {
-        return None;
-    }
-    (0..=haystack.len() - needle.len()).find(|&i| &haystack[i..i + needle.len()] == needle)
+fn find_subsequence_str(haystack: &str, needle: &str) -> Option<usize> {
+    haystack.find(needle)
 }
Suggestion importance[1-10]: 7

__

Why: The bytewise search in find_subsequence can produce incorrect results or panic with non-ASCII UTF-8 input, as acknowledged in the code comments. While the current tests use ASCII data, using str::find would be more robust and correct for general UTF-8 strings.

Medium
General
Validate variable_count_threshold is positive

The variable_count_threshold parameter is not validated. If a caller passes zero or
an unreasonably large value, the BRAIN algorithm's group-token-set logic could
produce incorrect patterns or panic on arithmetic operations. Add a range check to
ensure variable_count_threshold is positive and within a sensible upper bound.

sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/brain.rs [82-96]

 pub fn with_thresholds(variable_count_threshold: usize, threshold_percentage: f64) -> Self {
+    assert!(
+        variable_count_threshold > 0,
+        "variable_count_threshold must be positive"
+    );
     assert!(
         (0.0..=1.0).contains(&threshold_percentage),
         "threshold_percentage must be in [0.0, 1.0]"
     );
     Self {
         variable_count_threshold,
         threshold_percentage,
         ...
     }
 }
Suggestion importance[1-10]: 6

__

Why: A zero or negative variable_count_threshold would break the BRAIN algorithm's group-token-set logic (e.g., the >= threshold check in parse_log_pattern). Adding a positive-value assertion prevents silent failures and improves input validation, though the default value is already safe.

Low
Reject duplicate optional threshold arguments

The optional tail parsing loop assigns threshold_percentage and
variable_count_threshold based on Arrow type, but if both a float and an int literal
appear in the tail, the last one of each type wins. This could silently overwrite
user-provided values if the PPL layer sends duplicate arguments. Add a guard to
detect and reject duplicate assignments.

[sandbox/plugins/...

@ahkcs ahkcs force-pushed the feat/patterns-brain-datafusion branch from 972bc50 to a4d08d8 Compare May 22, 2026 00:00
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a4d08d8

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for a4d08d8: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@ahkcs ahkcs force-pushed the feat/patterns-brain-datafusion branch from a4d08d8 to 5380a05 Compare May 22, 2026 01:06
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 5380a05

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 5380a05: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e6db8ce

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for e6db8ce: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 22, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.45%. Comparing base (6105940) to head (216ae28).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21797      +/-   ##
============================================
+ Coverage     73.37%   73.45%   +0.08%     
- Complexity    75448    75532      +84     
============================================
  Files          6034     6033       -1     
  Lines        342504   342572      +68     
  Branches      49259    49276      +17     
============================================
+ Hits         251310   251637     +327     
+ Misses        71175    70947     -228     
+ Partials      20019    19988      -31     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 9d67da9

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 9d67da9: SUCCESS

@ahkcs ahkcs force-pushed the feat/patterns-brain-datafusion branch from 9d67da9 to 9c7d359 Compare May 26, 2026 19:06
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 9c7d359

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 9c7d359: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

ahkcs added a commit to ahkcs/sql that referenced this pull request May 26, 2026
… adapter

Per @penghuo's review: DataFusion-specific concerns shouldn't live in SQL core.
The 'g' flag is needed only because DataFusion's regexp_replace defaults to
first-match-only — Calcite's 3-arg form is already replace-all on both pushdown
and no-pushdown paths.

Restores SQL core, RexStandardizer, the patterns unit test, and the SIMPLE-
patterns explain YAMLs to their upstream/main shape. The 'g' flag is appended
in opensearch-project/OpenSearch#21797's RegexpReplaceAdapter when converting
3-arg REGEXP_REPLACE to DataFusion. Same end-user behavior, smaller SQL diff,
and the Calcite no-pushdown path no longer diverges from the pushdown YAML.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ec6da64

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for ec6da64: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

ahkcs added 9 commits May 27, 2026 09:54
…t scaffold

Starts the port of PPL `patterns` command's algorithm layer to native Rust
so the BRAIN method and `PATTERN_PARSER` scalar can execute on the
analytics-engine route. The end goal is `CalcitePPLPatternsIT` passing
against parquet-backed indices.

A new `patterns/` module containing the pure-logic layer (no DataFusion
dependency, fully unit-testable in isolation):

- `preprocess.rs` — Java `BrainLogParser.preprocess(...)` port. Regex-based
  variable detection (IP, ISO datetime, UUID, hex/letter-digit/floats),
  delimiter normalization, whitespace splitting. The "generic number
  surrounded by non-alphanumeric" rule is implemented manually because
  Rust's `regex` crate doesn't support lookaround.

- `utils.rs` — Java `PatternUtils` port. `parse_pattern`, `extract_variables`,
  `ParseResult` with `to_token_order_string`. `WILDCARD_PATTERN` and
  `TOKEN_PATTERN` regexes are equivalent to Java's `Pattern.compile` strings.

- `brain.rs` — `BrainLogParser` struct skeleton, `collapse_continuous_wildcards`,
  `PatternEntry` / `BrainParseStats` typed result types. The classifier
  internals (histogram, group token set, `parse_log_pattern`) are deliberate
  stubs in this milestone — they land in milestone 2.

- `tokens.rs` — `PatternResult` typed view of the per-row / per-group result
  the UDF layer will materialize into Arrow.

14 unit tests pass (1 `#[ignore]` placeholder for the full BRAIN
classification once the algorithm port lands):

- `preprocess_simple_log_line_splits_on_whitespace`
- `preprocess_substitutes_ip_then_blk_number` — matches the expected
  preprocessed shape from `testBrainLabelMode_NotShowNumberedToken`
- `preprocess_substitutes_uuid` — matches `testBrainParseWithUUID_*`
- `preprocess_collapses_consecutive_wildcards_via_number_runs`
- `parse_wildcard_pattern_splits_on_email_separators` —
  `<*>@<*>.<*>` from `testSimplePatternLabelMode_*`
- `to_token_order_string_rewrites_wildcards_to_numbered_tokens` —
  produces `<token1>@<token2>.<token3>`
- `extract_variables_extracts_email_parts` — matches
  `testSimplePatternLabelMode_ShowNumberedToken` ImmutableMap expectation
- `extract_variables_handles_multi_sample_aggregation` — matches
  `testBrainAggregationMode_ShowNumberedToken` multi-sample tokens.list
- `extract_variables_returns_empty_when_static_mismatch`
- `token_pattern_matches_numbered_placeholders`
- `collapse_three_consecutive_wildcards` / variants

1. Full BRAIN classifier port — histogram + group-token-set + parse_log_pattern.
2. DataFusion ScalarUDF wrapper for PATTERN_PARSER (`udf/pattern_parser.rs`).
3. DataFusion AggregateUDF wrapper for INTERNAL_PATTERN (aggregation mode).
4. DataFusion WindowUDF wrapper for INTERNAL_PATTERN (label mode).
5. Substrait YAML signatures for `pattern_parser` and `internal_pattern`.
6. Java adapters in analytics-backend-datafusion + ScalarFunction enum + capability registration.
7. `PatternsCommandIT` mirroring `CalcitePPLPatternsIT` against parquet-backed indices.
8. Verification via `:integTestRemote -Dtests.analytics.force_routing=true`.

This milestone is a no-op at runtime — `patterns/` is unwired. Lands as
the algorithmic foundation for the work above.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
PPL `patterns` lowers its result-struct flatten via `INTERNAL_ITEM(struct, "sample_logs")`
and `INTERNAL_ITEM(struct, "tokens")`, where `sample_logs` returns `ARRAY<VARCHAR>`
and `tokens` returns `MAP<VARCHAR, ARRAY<VARCHAR>>`. The scalar form of ITEM was
already in STANDARD_PROJECT_OPS (with SUPPORTED_FIELD_TYPES — covers VARCHAR /
numeric returns), but the ARRAY- and MAP-returning shapes weren't registered, so
OpenSearchProjectRule rejected the call with
`No backend supports scalar function [ITEM] among [datafusion]`.

Adds ITEM to both ARRAY_RETURNING_PROJECT_OPS and MAP_RETURNING_PROJECT_OPS.
Part of the PPL `patterns` command analytics-engine support stack.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
…ssifier port

Replaces the stub in patterns/brain.rs with a full port of
BrainLogParser.java from the SQL plugin's
common/src/main/java/org/opensearch/sql/common/patterns/.

## What lands

- preprocess_all_logs — tokenizes each line with default filters
  (IP/datetime/UUID/numbers), appends a synthetic logId token, and
  updates token_freq_map.
- process_token_histogram — positional token-frequency counter.
- calculate_group_token_freq — picks the representative WordCombination
  (sorted by same_freq_count desc, then word_freq desc) per row and
  populates the per-(tokens_len,candidate,position) group token set.
- parse_log_pattern — per-row classifier. Tokens whose frequency >
  repFreq are kept ONLY if unique in their group; tokens with frequency
  < repFreq are kept ONLY if the group has fewer than
  variable_count_threshold variants. Everything else becomes <*>.
- parse_all_log_patterns — full pipeline. Group equal pattern strings
  together, count occurrences, collect samples up to max_sample_count.
- WordCombination — typed pair with the Java compareTo ordering.
- collapse_continuous_wildcards — adjacent-<*> collapse, unchanged from
  milestone 1.

## Tests (18/18 pass)

Existing 14 unit tests from milestone 1 still pass. New tests cover
the BRAIN classifier directly with fixtures from CalcitePPLPatternsIT:

- brain_groups_verification_succeeded_lines — two
  'Verification succeeded' lines collapse to one pattern with 2 samples.
- brain_aggregates_hdfs_fixtures_into_four_groups — the 8-line HDFS
  fixture matches the IT's testBrainAggregationMode_NotShowNumberedToken
  expectation: exactly 4 patterns, every group has count == 2.
- brain_aggregates_brain_label_mode_blockstar_into_expected_pattern —
  spot-checks the exact pattern string the IT asserts on row 1 of
  testBrainLabelMode_NotShowNumberedToken:
  'BLOCK* NameSystem.addStoredBlock: blockMap updated: <*IP*> is added
  to blk_<*> size <*>'.

The fixtures match the IT 1:1 — equivalence with the Java implementation
on the test surface that motivated this work is enforced at unit test
time. Subsequent milestones wire the algorithm into DataFusion's
ScalarUDF / AggregateUDF / WindowUDF APIs and registers the
opensearch_scalar_functions.yaml signatures so the analytics-engine
route can dispatch to it.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
…unctions

Adds patterns/eval.rs with the three entry points the PATTERN_PARSER
scalar UDF dispatches between. Mirrors PatternParserFunctionImpl's
evalField / evalAgg / evalSamples (197-line Java class).

- eval_field(pattern, field) — SIMPLE label mode + show_numbered_token.
  Parses the wildcard pattern, extracts field substrings into a token
  map, returns numbered-token rewrite.
- eval_samples(pattern, sample_logs) — SIMPLE aggregation mode +
  show_numbered_token. Token map accumulates across all sample logs.
- eval_agg(field, agg_object, show_numbered_token) — BRAIN label mode.
  Scores each candidate pattern against the preprocessed input tokens,
  picks the highest-similarity candidate, optionally rewrites to
  numbered tokens.

26/26 patterns module tests pass. New cases pin equivalence with the
CalcitePPLPatternsIT expectations directly:

- eval_field_renames_email_wildcards_to_numbered_tokens — matches
  testSimplePatternLabelMode_ShowNumberedToken's <token1>@<token2>.<token3>
  on "amberduke@pyrami.com".
- eval_field_handles_custom_pattern — testSimplePatternLabelModeWithCustomPattern_*
  with the "amberduke<*>" prefix-anchored template.
- eval_samples_accumulates_tokens_across_samples — matches
  testSimplePatternAggregationMode_ShowNumberedToken's 3-sample case.
- eval_agg_picks_best_matching_candidate — best-fit similarity
  scoring against two BRAIN-aggregate candidates.

Next milestone: ScalarUDF wrapper in udf/pattern_parser.rs +
opensearch_scalar_functions.yaml entry + ScalarFunction enum +
Java adapter so the analytics-engine route can dispatch into these
eval functions. Currently unused at runtime.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
…R scalar UDF wiring

Wires the eval functions from milestone 3a into a DataFusion ScalarUDF
and registers all the cross-component plumbing the analytics-engine
route needs to dispatch to it. After this milestone the PPL SIMPLE
patterns + show_numbered_token call shape (evalField / evalSamples) is
reachable from a Calcite RelNode on the analytics-engine path.

- udf/pattern_parser.rs — DataFusion ScalarUDF wrapper. Accepts two
  operand shapes:
    pattern_parser(VARCHAR, VARCHAR)         — evalField
    pattern_parser(VARCHAR, List<VARCHAR>)   — evalSamples
  Return type is Struct<pattern: VARCHAR, tokens: Map<VARCHAR,
  List<VARCHAR>>>. The 3-arg evalAgg shape used by BRAIN label mode
  goes through a separate path (next milestone).

- opensearch_scalar_functions.yaml — substrait  entry,
  return type declared as any1 (same convention json_extract_all uses
  for its concrete Map return).

- ScalarFunction.PATTERN_PARSER — new enum constant in
  analytics-framework.

- PatternParserAdapter — rename adapter (AbstractNameMappingAdapter)
  that routes PPL's INTERNAL_PATTERN_PARSER calls to the locally-
  declared  SqlFunction. The locally-declared operator
  is the referent of the FunctionMappings.s entry that gives isthmus
  the substrait extension name.

- DataFusionAnalyticsBackendPlugin —
    * adapter map: PATTERN_PARSER → new PatternParserAdapter()
    * MAP_RETURNING_PROJECT_OPS: + PATTERN_PARSER (its return type is
      MAP<VARCHAR, ANY> per UserDefinedFunctionUtils.patternStruct)

- DataFusionFragmentConvertor.ADDITIONAL_SCALAR_SIGS —
    FunctionMappings.s(LOCAL_PATTERN_PARSER_OP, "pattern_parser")

Existing 26 patterns module unit tests still pass; new 2 unit tests in
udf::pattern_parser pin the StructArray construction shape:
- struct_data_type_has_pattern_and_tokens_fields
- build_struct_array_populates_pattern_and_tokens_for_email_evalfield

Total Rust crate test pass: 28/28.

Native lib rebuild + IT run-through is the next step in this branch —
to verify the IT pass count moves from 3/14 to ≥ 5/14 (the 2 SIMPLE
patterns+show_numbered_token tests should pass once the runtime
substrait binding succeeds).

- INTERNAL_PATTERN aggregate UDF (3 tests — BRAIN aggregation mode)
- INTERNAL_PATTERN window UDF (3 tests — BRAIN label mode; this UDF is
  the input to PATTERN_PARSER's 3-arg evalAgg shape)
- TAKE aggregate nullability fix (1 test)
- 3-arg evalAgg shape in this UDF (depends on window UDF landing)

Signed-off-by: Kai Huang <ahkcs@amazon.com>
… returns

PPL's flattenParsedPattern wraps INTERNAL_ITEM(struct, key) lookups in
SAFE_CAST whenever the keyed field needs an explicit declared type. The
flatten step targets:
 - SAFE_CAST(ITEM(struct, "pattern"), VARCHAR)        — scalar (covered)
 - SAFE_CAST(ITEM(struct, "pattern_count"), BIGINT)   — scalar (covered)
 - SAFE_CAST(ITEM(struct, "tokens"), MAP<VARCHAR,...>) — needs MAP entry
 - SAFE_CAST(ITEM(struct, "sample_logs"), ARRAY<VAR>) — needs ARRAY entry

SAFE_CAST is already in STANDARD_PROJECT_OPS for SUPPORTED_FIELD_TYPES
(scalar) returns. Add it to ARRAY_RETURNING_PROJECT_OPS and
MAP_RETURNING_PROJECT_OPS so the OpenSearchProjectRule planner check
admits the call when its inferred return type is array- or map-shaped.

This is part of the PPL patterns command analytics-engine support
stack. Effect on CalcitePPLPatternsIT: the 6 SAFE_CAST 'No backend
supports' failures from milestone 3b shift to substrait-binding
errors that need the INTERNAL_PATTERN window / aggregate UDFs to
fully clear.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
…BRAIN aggregation mode)

Implements the BRAIN aggregate side of PPL's `patterns ... method=BRAIN` on
the analytics-engine route. Mirrors `LogPatternAggFunction` on the SQL plugin
side: collects per-group log lines and runs BRAIN over the corpus at
finalize time, emitting

  List<Struct<pattern, pattern_count, tokens, sample_logs>>

so the downstream UNNEST + ITEM projections (PPL's `flattenParsedPattern`)
resolve as named struct-field access.

Wiring (top-down):

- `AggregateFunction` (analytics-framework): adds `PATTERN` enum constant.
  Makes `fromNameOrError` case-insensitive — the PPL operator is registered
  lower-case ("pattern") whereas the enum constants are upper-case, and the
  raw `valueOf("pattern")` lookup was failing.

- `DataFusionAnalyticsBackendPlugin`: declares `AggregateFunction.PATTERN`
  in `AGG_FUNCTIONS` so the capability registry advertises it as supported
  on the datafusion backend.

- `DataFusionFragmentConvertor`: adds `LOCAL_INTERNAL_PATTERN_OP`
  (substrait-bound to `internal_pattern`) and an `ADDITIONAL_AGGREGATE_SIGS`
  entry — same pattern as `LOCAL_TAKE_OP` / `LOCAL_FIRST_OP`.

- `PplAggregateCallRewriter`: rewrites PPL `pattern(...)` calls onto
  `LOCAL_INTERNAL_PATTERN_OP` and substitutes the call's return type with
  the concrete struct shape (PPL's declared `ARRAY<MAP<VARCHAR, ANY>>` has
  an embedded `ANY` that isthmus cannot serialize to Substrait).

- `opensearch_aggregate_functions.yaml`: registers `internal_pattern`
  with 4-arg, 5-arg, 6-arg, and 1-arg (FINAL) overloads matching the PPL
  emitted call shapes (max_sample_count, buffer_limit, show_numbered_token,
  plus optional frequency_threshold_percentage and variable_count_threshold).

- `udaf/internal_pattern.rs`: the actual UDAF — variadic_any signature, an
  Accumulator that collects log lines and runs `BrainLogParser` at
  evaluate() time, with per-shard state shaped as `List<Utf8>` so the
  coordinator's FINAL accumulator can concatenate via `merge_batch`. 3
  unit tests pin behaviour (empty corpus, repeated-pattern grouping,
  cross-shard merge).

Test impact (`CalcitePPLPatternsIT` via analytics-engine route): the three
BRAIN aggregation-mode tests advance past the previous
`AggregateFunction.pattern` enum lookup; they now surface a downstream
"Project rule encountered unmarked child [LogicalCorrelate]" from the
PPL Calcite path's UNNEST after the aggregate (separate planner work to
add Correlate support, not covered by this commit). Window UDF for BRAIN
label mode and the SAFE_CAST nullability fix for SIMPLE aggregation mode
are also pending.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
…LOCAL_ARRAY_AGG_OP

PPL declares TAKE's return type via {@code PPLReturnTypes.ARG0_ARRAY} which
passes {@code nullable=true} to {@code SqlTypeUtil.createArrayType}. The
rewriter clones that nullable type onto the rewritten LOCAL_TAKE_OP call,
but the operator's default {@code ReturnTypes.TO_ARRAY} infers NOT NULL
(because aggregates over a non-empty group can't be null in standard SQL
semantics). The mismatch trips {@code AggregateCall.create}'s validation
with

  type mismatch:
    aggCall type:   VARCHAR ARRAY
    inferred type:  VARCHAR ARRAY NOT NULL

Surface: CalcitePPLPatternsIT's SIMPLE aggregation tests — the Parse-then-
Aggregate path applies TAKE to the source field, and the field's nullable
typing flows into the aggregate's declared return type. The bug is general
though — any nullable-input TAKE rewritten through the analytics-engine
backend would have hit it as soon as the rewriter's explicit-type path
fired.

Fix: andThen FORCE_NULLABLE on both LOCAL_TAKE_OP and LOCAL_ARRAY_AGG_OP so
the operator's inferred type matches what PPL emits. Mirror of the PPL
side's ARG0_ARRAY.

Test impact: CalcitePPLPatternsIT 3/15 → 4/15 (one SIMPLE aggregation test
unblocked; the show_numbered_token variants still hit the separate
"Unable to convert the type ANY" issue from PATTERN_PARSER's MAP<VARCHAR,
ANY> return type).

Signed-off-by: Kai Huang <ahkcs@amazon.com>
…list<string> overload (partial)

Wires up the PATTERN_PARSER ANY-type fixes that the SIMPLE pattern
show_numbered_token tests need, but does NOT yet bring them green —
the wrapping {@code map_extract} + {@code array_element} chain that
ArrayElementAdapter created (when ITEM was lowered against the
original PPL MAP<VARCHAR, ANY> declared type) keeps its frozen
ANY return type even after PATTERN_PARSER is rewritten to STRUCT.
Substrait's TypeConverter still rejects with "Unable to convert the
type ANY" when it walks the operand types of the wrappers.

Captures the necessary framework:

- `opensearch_scalar_functions.yaml`: adds a second
  `pattern_parser(pattern: string, sample_logs: list<string>)`
  overload — the SIMPLE aggregation mode emits this call shape from
  the PPL Calcite visitor's `showNumberedToken=true` branch, and
  without this overload isthmus's ScalarFunctionConverter rejects the
  call earlier with "Unable to convert call pattern_parser(string?,
  list<string?>?)" before even getting to operand-type validation.

- `PatternParserAdapter`: overrides `adapt` to substitute the PPL
  declared MAP<VARCHAR, ANY> return type with the concrete struct
  shape `STRUCT<pattern: VARCHAR, tokens: MAP<VARCHAR, ARRAY<VARCHAR>>>`
  that matches the Rust UDF's Arrow output. Same pattern as the
  PATTERN aggregate's PplAggregateCallRewriter case.

- `ItemTypeRebuilder` (new): pre-isthmus shuttle that walks every
  Project / Filter expression tree. Rebuilds ITEM calls so their
  return type is re-derived from operand 0 (handles the legacy
  ITEM-on-STRUCT case for non-adapted plans), and substitutes any
  pattern_parser call whose declared type is still MAP<VARCHAR, ANY>
  with the concrete struct. Wired into both
  `convertToSubstrait` and `convertStandalone` so attached-wrapper
  paths (PARTIAL agg + FINAL agg conversion) get the same rewrite.

Remaining gap (for follow-up): ArrayElementAdapter has already
converted ITEM on MAP into `array_element(map_extract(map, key), 1)`
with ANY-derived return types BEFORE PatternParserAdapter substitutes
the inner struct type. The wrappers stay typed as ANY ARRAY / ANY,
and isthmus rejects them. Two paths to close this:
  - SQL plugin: change PATTERN_PARSER's declared return type from
    MAP<VARCHAR, ANY> to a concrete struct (touches v2 path's result
    materialisation).
  - Backend: extend ItemTypeRebuilder to detect the
    `array_element(map_extract(STRUCT, key), 1)` anti-pattern and
    rewrite to a direct STRUCT field access.

Test impact (CalcitePPLPatternsIT via analytics-engine route): no
change from previous 4/15 pass count — the SIMPLE pattern
show_numbered_token tests still hit the ANY-typed wrapper chain. The
framework is in place; a follow-up commit closes the remaining gap.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 9b60303

Self-check inside ensureMultiShardProvisioned() reads
GET /<index>/_settings and asserts the index settings report
number_of_shards=3. Makes the multi-shard nature of the
aggregation tests provable from the test itself rather than
implicit in the DatasetProvisioner.provision() call.

Per @marc's review request to verify the multi-shard claim.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e39c9ee

ahkcs added a commit to opensearch-project/sql that referenced this pull request May 27, 2026
…#5467)

* feat(api): add PATTERN_* settings defaults to UnifiedQueryContext

PPL `patterns` command's AstBuilder reads cluster settings for method/mode/
max_sample_count/buffer_limit/show_numbered_token defaults when the query
omits them. Without these in the analytics-engine path's settings map, the
parser reads null, falls into `PatternMethod.valueOf("NULL")`, and every
`patterns` query without an explicit `method=` or `mode=` argument fails at
parse time with `No enum constant PatternMethod.NULL`.

Mirrors the OpenSearchSettings defaults (SIMPLE_PATTERN / LABEL / 10 /
100000 / false). Part of the analytics-engine route support for the
`patterns` command.

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* feat(core): emit 4-arg regexp_replace with 'g' flag for SIMPLE patterns

`buildParseRelNode` for `ParseMethod.PATTERNS` lowered through PPL's REPLACE
handler, which always emits Calcite's 3-arg `REGEXP_REPLACE_3`. That works on
the V2 / Calcite path (Calcite's default is replace-all), but the analytics-
engine route converts the call to substrait + DataFusion, and DataFusion's
`regexp_replace` defaults to first-match-only without an explicit "g" flag.

The dashboard test for `source = bank | patterns email mode=label` returned
`<*>@pyrami.com` instead of `<*>@<*>.<*>` because only the first
`[a-zA-Z0-9]+` run was replaced.

Bypass the REPLACE handler for the PATTERNS branch and emit
`REGEXP_REPLACE_PG_4` directly with a constant "g" flag. Same semantics on V2 /
Calcite (Calcite's REGEXP_REPLACE_PG_4 with "g" = replace-all); fixes the
analytics-engine path.

CalcitePPLPatternsTest plan-string expectations updated to match the 4-arg
form. 17/17 unit tests pass. IT result on analytics-engine route:
testSimplePatternLabelMode_NotShowNumberedToken now passes.

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* test(integ-test): add CalcitePPLDashboardPatternsIT pinning BRAIN-label dashboard query

OpenSearch Dashboards renders BRAIN-pattern panels with the shape:

  patterns ... method=BRAIN mode=label
  | stats count() as pattern_count, take(message, 1) as sample_logs
    by patterns_field
  | sort -pattern_count
  | fields patterns_field, pattern_count, sample_logs

This integration test pins that shape on the analytics-engine route so
regressions surface immediately. Schema-only assertions because BRAIN's
clustering output is dataset-version-sensitive — the contract we care about
is "the query plans, executes, and returns three columns in the right order".

Currently red end-to-end pending the BRAIN label window-UDF type-cascade
fix (see the OpenSearch-side WIP commit "BRAIN window UDF + dashboard
query path scaffolding" — the {@code PplWindowCallRewriter} stub
documents the remaining gap).

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* style: apply spotless formatting

Spotless drift from cherry-picking the analytics-engine patterns work
across upstream's recent formatting touch-ups. No behavior change.

Signed-off-by: Kai Huang <huangkaics@gmail.com>
Signed-off-by: Kai Huang <ahkcs@amazon.com>

* test(integ-test): update SIMPLE-patterns explain YAML for 4-arg regexp_replace

CalciteExplainIT's `testPatternsSimplePatternMethodWith{out,AggPushDown}Explain`
expected the old 3-arg `REGEXP_REPLACE(...)` form, but after the `feat(core)`
commit emits 4-arg `REGEXP_REPLACE(..., 'g':VARCHAR)` the plan output now
includes the extra operand both in the logical line and in the base64-encoded
compounded script of the physical/pushdown plan.

Regenerate both YAML expectations against the live planner.

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* fix(opensearch): collapse 4-arg REGEXP_REPLACE_PG_4 'g' to 3-arg at script pushdown

The `feat(core)` commit on this branch lowered PPL `patterns` to a 4-arg
`REGEXP_REPLACE_PG_4(field, pattern, replacement, 'g')` so DataFusion (which
defaults to first-match-only) does global replacement on the analytics-engine
route. Calcite's enumerable runtime — which the V2 / Calcite-pushdown path uses
to compile the serialized RexCall into Janino bytecode — has no matching
`SqlFunctions.regexpReplace(String, String, String, String)` impl (only
`(String, String, String, int[, ...])` variants where the 4th arg is start
position, not a flags string). Janino codegen failed with
`No applicable constructor/method found` for the 4-arg-with-flags call shape,
breaking the patterns.md doctest (`source=apache | patterns message
method=simple_pattern mode=aggregation`).

Two complementary fixes:

1. `RexStandardizer.visitCall`: before serializing for pushdown, collapse
   `REGEXP_REPLACE_PG_4(field, pattern, replacement, 'g')` to the 3-arg
   `REGEXP_REPLACE_3` form. Safe because Calcite's 3-arg variant is already
   replace-all (same semantics as PG_4 with `g`). Only fires when the flags
   literal is exactly `"g"` so any future `i`/`m`/etc. use cases pass through
   untouched.

2. `ExtendedRelJson.toOp`: pass operand count when looking up an operator on
   the deserialization side so multi-arity SQL names (REGEXP_REPLACE_3 vs
   REGEXP_REPLACE_PG_4 vs REGEXP_REPLACE_5 all share `name="REGEXP_REPLACE"`)
   resolve to the right overload. Defensive — the standardizer fix above is
   what actually unblocks the doctest, but the resolver was picking by name
   alone and would have surfaced the same bug for any other overloaded
   builtin.

Verified locally:
- doctest queries (`patterns ... method=simple_pattern mode=aggregation [...]`)
  now return fully-tokenized output;
- `CalcitePPLDashboardPatternsIT` still 1/1 PASS;
- `CalcitePPLPatternsIT` still 10/15 with the same five known-pending failures
  (LogicalCorrelate + `_ShowNumberedToken` BRAIN cases).

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* fix(opensearch): revert arity-aware toOp; restore spath/JSON_EXTRACT doctest

The arity filter added to ExtendedRelJson.toOp in the previous commit broke
SAFE_CAST → JSON_EXTRACT deserialization (used by `spath` lowering): the
PPL JSON_EXTRACT UDF, registered as an anonymous UserDefinedFunctionBuilder
subclass, doesn't expose a meaningful getOperandCountRange(), so my filter
fell through to the firstKindMatch path and skipped the
AvaticaUtils.instantiatePlugin "class" path that previously resolved the
UDF. spath.md doctest started returning RuntimeException on
`source=structured | spath input=doc_n n | eval n=cast(n as int) | stats sum(n)`.

The RexStandardizer collapse (4-arg `REGEXP_REPLACE_PG_4(..., 'g')` → 3-arg
`REGEXP_REPLACE_3`) already fixes the patterns.md doctest at the source side
— by the time pushdown serialization runs, no 4-arg call exists for toOp to
disambiguate. The arity filter was defensive only and no longer carries its
weight; revert toOp to the original first-kind-match behavior, plus a spotless
re-flow that came in with the same change.

Verified locally on a fresh cluster:
- spath.md doctest query → returns sum(n)=6 (was 500).
- patterns.md doctest query → returns fully-tokenized aggregation rows.
- CalcitePPLDashboardPatternsIT → 1/1 PASS.
- CalcitePPLPatternsIT → 10/15 PASS (same baseline; same five known-pending
  BRAIN failures tracked separately).

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* style: trim verbose comments per review

Per @penghuo: drop the verbose multi-line explanatory comments and tighten
the class/method javadoc on the new dashboard IT.

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* test(integ-test): add verifyDataRows to dashboard patterns IT

Per @dai-chen: schema-only verification doesn't catch "query succeeds but
returns 0/wrong rows". Pin the 4 BRAIN clusters with their exact patterns,
counts, and sample logs against the HDFS_LOGS fixture.

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* refactor(core): fuse PATTERNS if-else in buildParseRelNode

Per @dai-chen: the two consecutive `if (PATTERNS)` branches in
buildParseRelNode share a condition; merge into a single if/else with
each branch fully co-located. Pure refactor — CalcitePPLPatternsTest
(logical-plan unit test) passes.

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* test(integ-test): include CalcitePPLDashboardPatternsIT in CalciteNoPushdownIT

Per CLAUDE.md guidance, new Calcite IT classes should be added to the
no-pushdown suite. Verified locally that the dashboard query also passes
with pushdown disabled (Dashboard 1/1, Patterns 10/15 — same baseline).

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* test(integ-test): regenerate agg-push explain YAML for 3-arg REGEXP_REPLACE

The previous YAML capture pre-dated the RexStandardizer 4-arg → 3-arg
collapse landing. With the collapse, the pushed-down compounded script
serializes the 3-arg form (SOURCES has 7 entries, no trailing 'g').

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* revert(core): drop SQL-side 'g' flag for patterns; move to DataFusion adapter

Per @penghuo's review: DataFusion-specific concerns shouldn't live in SQL core.
The 'g' flag is needed only because DataFusion's regexp_replace defaults to
first-match-only — Calcite's 3-arg form is already replace-all on both pushdown
and no-pushdown paths.

Restores SQL core, RexStandardizer, the patterns unit test, and the SIMPLE-
patterns explain YAMLs to their upstream/main shape. The 'g' flag is appended
in opensearch-project/OpenSearch#21797's RegexpReplaceAdapter when converting
3-arg REGEXP_REPLACE to DataFusion. Same end-user behavior, smaller SQL diff,
and the Calcite no-pushdown path no longer diverges from the pushdown YAML.

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* test(api): pin UnifiedQueryContext PATTERN_* defaults via planner test

Per @dai-chen: verify the RelNode produced when `patterns <field>` is run
without explicit method=/mode= args — exercises that the PATTERN_METHOD and
PATTERN_MODE defaults flow through to AstBuilder.visitPatternsCommand and
produce a valid SIMPLE/LABEL lowering with a `patterns_field` projection.

Signed-off-by: Kai Huang <ahkcs@amazon.com>

* style: spotlessApply

Signed-off-by: Kai Huang <ahkcs@amazon.com>

---------

Signed-off-by: Kai Huang <ahkcs@amazon.com>
Signed-off-by: Kai Huang <huangkaics@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit f207f61.

PathLineSeverityDescription
sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/internal_pattern.rs29mediumThe `buffer_limit` parameter is explicitly accepted in the UDAF signature for parity with Java but is documented as intentionally ignored ('currently ignored — no partial pre-aggregation'). The accumulator's `buffer` field grows without bound until `evaluate()` is called, meaning a query over a large corpus has no per-shard memory ceiling. An attacker with query access could trigger OOM by submitting a BRAIN aggregate over a sufficiently large index shard. This is a deliberate architectural shortcut, not a bug.
sandbox/plugins/analytics-backend-datafusion/rust/src/patterns/preprocess.rs142lowA deliberately non-matching placeholder regex (`r"(?-u)(?:^^^^^^never^^^^^^)"`) is inserted at index 4 of the DEFAULT_FILTER_PATTERNS slice solely to maintain index-count parity with Java. The surrounding loop explicitly skips index 4. If a future change removes or reorders this skip logic, the nonsensical regex becomes active as a filter rule. The intent is documented but the pattern is fragile.
sandbox/plugins/analytics-backend-datafusion/rust/src/udwf/internal_pattern.rs224lowWindowConfig::from_args attempts to infer whether argument slot 4 is a float or an integer by trying both read_f64_at and read_i64_at in sequence, with no validation that exactly one type matched. If a caller passes an integer value in slot 4 and a float in slot 5, the heuristic could silently swap the semantics of threshold_percentage and variable_count_threshold, producing unexpectedly permissive or restrictive pattern grouping without any error. This is an out-of-place silent type-coercion path in security-adjacent configuration parsing.

The table above displays the top 10 most important findings.

Total: 3 | Critical: 0 | High: 0 | Medium: 1 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

ahkcs added 2 commits May 27, 2026 13:44
Per @mch2's nit: pull the kind→name fallback out of OpenSearchProjectRule
into a single WindowFunction.resolveFunction(SqlOperator) so future
backends pick up the OTHER-kind name-lookup logic without copy-paste.

Three unit tests for the new method cover sql-kind hit, OTHER-kind name
fallback (PATTERN), and unknown-operator returning null.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
Per @mch2's review: the prior wording implied SINGLE-on-SINGLETON was
"the only viable choice on a single shard anyway", which is wrong now
that PatternsCommandIT exercises this path on a 3-shard index. Restate:
SINGLE-on-SINGLETON also runs correctly on multi-shard (gather to the
coordinator, then aggregate), it just trades distributed parallelism for
the type-mismatch workaround. Distributed parallelism is still the
follow-up.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 5bee96e

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ff3830f

…rop ad-hoc check

Per @mch2's review: PERCENTILE_APPROX is semantically a STATE_EXPANDING
aggregate (per-key t-digest state grows with input cardinality), exactly like
PERCENTILE_CONT and PERCENTILE_DISC which are already in the enum. The "right
spot" for it is the AggregateFunction registry, not an ad-hoc string check in
OpenSearchAggregateSplitRule.

Changes:
- AggregateFunction.PERCENTILE_APPROX(Type.STATE_EXPANDING, SqlKind.OTHER).
- OpenSearchAggregateSplitRule: replace the isPercentileApprox + separate
  STATE_EXPANDING checks with a single isStateExpanding(SqlAggFunction) helper
  that handles the unregistered-op throw gracefully (returns false rather
  than crashing the planner — fixes a latent issue where my earlier STATE_EXPANDING
  addition would have crashed on truly unknown aggs).
- Javadoc refreshed to describe the STATE_EXPANDING category rather than
  calling out percentile_approx as a one-off.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 4c093f4

Drop AI-generated explanatory blocks; keep terse WHY-only notes
where context isn't obvious from the code itself.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6058798

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 111578e

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6653bde

…olver

Required by sandbox-check's missingJavadoc task on public types.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 87a268c

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 87a268c: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 4c623c3

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 4c623c3: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 216ae28

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 216ae28: SUCCESS

@mch2 mch2 merged commit 15ad045 into opensearch-project:main May 28, 2026
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants