From 6eb7be140af1b003a546f8f8ec72e68c6b5c8449 Mon Sep 17 00:00:00 2001 From: Anurag Rai Date: Tue, 12 May 2026 13:50:05 +0530 Subject: [PATCH 1/6] add integration of query scan cost based killing in all server operators --- .../core/accounting/QueryMonitorConfig.java | 3 +- .../pinot/core/operator/BaseOperator.java | 20 +++ .../pinot/core/operator/DocIdSetOperator.java | 18 +++ .../operator/query/AggregationOperator.java | 12 ++ .../core/operator/query/DistinctOperator.java | 12 ++ .../query/FilteredAggregationOperator.java | 12 ++ .../query/FilteredGroupByOperator.java | 12 ++ .../core/operator/query/GroupByOperator.java | 12 ++ .../operator/query/SelectionOnlyOperator.java | 12 ++ .../query/SelectionOrderByOperator.java | 16 +++ ...ectionPartiallyOrderedByDescOperation.java | 12 ++ ...ctionPartiallyOrderedByLinearOperator.java | 12 ++ .../StreamingSelectionOnlyOperator.java | 12 ++ .../executor/ServerQueryExecutorV1Impl.java | 44 ++++++ .../CompositeQueryKillingStrategy.java | 4 +- .../core/query/killing/QueryKillReport.java | 17 ++- .../query/killing/QueryKillingManager.java | 125 ++++++++++++++---- .../query/killing/QueryKillingStrategy.java | 2 +- .../ScanEntriesThresholdStrategy.java | 4 +- .../CompositeQueryKillingStrategyTest.java | 2 +- .../query/killing/QueryKillReportTest.java | 7 + .../killing/QueryKillingManagerTest.java | 4 +- .../ScanEntriesThresholdStrategyTest.java | 4 +- .../starter/helix/BaseServerStarter.java | 11 ++ .../spi/query/QueryExecutionContext.java | 46 +++++++ 25 files changed, 393 insertions(+), 42 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java index 920648ca5061..c5bfcde27adf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java @@ -158,7 +158,8 @@ public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) { (String) null); } - QueryMonitorConfig(QueryMonitorConfig oldConfig, Set changedConfigs, Map clusterConfigs) { + public QueryMonitorConfig(QueryMonitorConfig oldConfig, Set changedConfigs, + Map clusterConfigs) { _maxHeapSize = oldConfig._maxHeapSize; if (changedConfigs.contains(Accounting.Keys.MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java index a99855a07a7a..7805cccefa9e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java @@ -25,6 +25,10 @@ import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.plan.ExplainInfo; +import org.apache.pinot.core.query.killing.QueryKillingManager; +import org.apache.pinot.spi.exception.TerminationException; +import org.apache.pinot.spi.query.QueryExecutionContext; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.trace.InvocationScope; import org.apache.pinot.spi.trace.Tracing; @@ -55,6 +59,22 @@ public ExplainInfo getExplainInfo() { protected void checkTermination() { QueryThreadContext.checkTermination(this::getExplainName); + // Scan-based query killing check + QueryKillingManager killingManager = QueryKillingManager.getInstance(); + if (killingManager != null) { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + if (ctx != null) { + QueryExecutionContext execCtx = ctx.getExecutionContext(); + QueryScanCostContext scanCost = execCtx.getQueryScanCostContext(); + if (scanCost != null) { + killingManager.checkAndKillIfNeeded(execCtx, scanCost); + TerminationException te = execCtx.getTerminateException(); + if (te != null) { + throw te; + } + } + } + } } protected void checkTerminationAndSampleUsage() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java index f7d87453a0f5..6aca0146a29a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java @@ -28,6 +28,7 @@ import org.apache.pinot.core.operator.filter.BaseFilterOperator; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.apache.pinot.spi.query.QueryThreadContext; @@ -49,6 +50,7 @@ public class DocIdSetOperator extends BaseDocIdSetOperator { private BlockDocIdSet _blockDocIdSet; private BlockDocIdIterator _blockDocIdIterator; private int _currentDocId = 0; + private long _lastReportedEntriesScanned = 0; public DocIdSetOperator(BaseFilterOperator filterOperator, int maxSizeOfDocIdSet) { Preconditions.checkArgument(maxSizeOfDocIdSet > 0 && maxSizeOfDocIdSet <= DocIdSetPlanNode.MAX_DOC_PER_CALL); @@ -80,12 +82,28 @@ protected DocIdSetBlock getNextBlock() { docIds[pos++] = _currentDocId; } if (pos > 0) { + // Push scan cost delta for proactive query killing + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + long currentTotal = _blockDocIdSet.getNumEntriesScannedInFilter(); + long delta = currentTotal - _lastReportedEntriesScanned; + if (delta > 0) { + scanCost.addEntriesScannedInFilter(delta); + _lastReportedEntriesScanned = currentTotal; + } + } return new DocIdSetBlock(docIds, pos); } else { return null; } } + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } + @Override public String toExplainString() { return EXPLAIN_NAME; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java index 31ef246eb328..a5edb35673cf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java @@ -35,6 +35,8 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils.AggregationInfo; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -72,6 +74,10 @@ protected AggregationResultsBlock getNextBlock() { ValueBlock valueBlock; while ((valueBlock = _projectOperator.nextBlock()) != null) { _numDocsScanned += valueBlock.getNumDocs(); + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(valueBlock.getNumDocs()); + } aggregationExecutor.aggregate(valueBlock); } @@ -121,4 +127,10 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { .collect(Collectors.toList()); attributeBuilder.putStringList("aggregations", aggregations); } + + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java index 70b51ca458dd..97efd556e3d9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java @@ -33,6 +33,8 @@ import org.apache.pinot.core.query.distinct.DistinctExecutorFactory; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -60,6 +62,10 @@ protected DistinctResultsBlock getNextBlock() { ValueBlock valueBlock; while ((valueBlock = _projectOperator.nextBlock()) != null) { _numDocsScanned += valueBlock.getNumDocs(); + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(valueBlock.getNumDocs()); + } if (executor.process(valueBlock)) { break; } @@ -117,4 +123,10 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { .collect(Collectors.toList()); attributeBuilder.putStringList("keyColumns", expressions); } + + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java index 21697367a5a8..e04760247b48 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java @@ -35,6 +35,8 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils.AggregationInfo; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -95,6 +97,10 @@ protected AggregationResultsBlock getNextBlock() { result[resultIndexMap.get(aggregationFunctions[i])] = filteredResult.get(i); } _numDocsScanned += numDocsScanned; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsScanned); + } _numEntriesScannedInFilter += projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); _numEntriesScannedPostFilter += (long) numDocsScanned * projectOperator.getNumColumnsProjected(); } @@ -127,4 +133,10 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { attributeBuilder.putStringList("aggregations", aggregations); } } + + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java index d0555b6e39c2..12fa5e507481 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java @@ -48,6 +48,8 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,6 +162,10 @@ protected GroupByResultsBlock getNextBlock() { } _numDocsScanned += numDocsScanned; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsScanned); + } _numEntriesScannedInFilter += projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); _numEntriesScannedPostFilter += (long) numDocsScanned * projectOperator.getNumColumnsProjected(); GroupByResultHolder[] filterGroupByResults = groupByExecutor.getGroupByResultHolders(); @@ -288,4 +294,10 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { attributeBuilder.putStringList("aggregations", aggregations); } } + + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java index bd6f58095f3d..9f1591cea013 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java @@ -42,6 +42,8 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,6 +121,10 @@ protected GroupByResultsBlock getNextBlock() { while ((valueBlock = _projectOperator.nextBlock()) != null) { _numDocsScanned += valueBlock.getNumDocs(); + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(valueBlock.getNumDocs()); + } groupByExecutor.process(valueBlock); } @@ -237,4 +243,10 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { .collect(Collectors.toList()); attributeBuilder.putStringList("aggregations", aggregations); } + + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java index 8527f9dc0329..b58f4c3ea9b9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java @@ -38,6 +38,8 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; import org.roaringbitmap.RoaringBitmap; @@ -124,6 +126,10 @@ protected SelectionResultsBlock getNextBlock() { int numDocsToAdd = Math.min(_numRowsToKeep - _rows.size(), valueBlock.getNumDocs()); _rows.ensureCapacity(_rows.size() + numDocsToAdd); _numDocsScanned += numDocsToAdd; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsToAdd); + } if (_nullHandlingEnabled) { for (int i = 0; i < numExpressions; i++) { _nullBitmaps[i] = _blockValSets[i].getNullBitmap(); @@ -168,4 +174,10 @@ public ExecutionStatistics getExecutionStatistics() { return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, numTotalDocs); } + + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java index 5db3e9149306..8961ddfc3752 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java @@ -52,6 +52,8 @@ import org.apache.pinot.core.query.utils.OrderByComparatorFactory; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; import org.roaringbitmap.RoaringBitmap; @@ -189,6 +191,10 @@ private SelectionResultsBlock computeAllOrdered() { } } _numDocsScanned += numDocsFetched; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsFetched); + } } _numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected; @@ -253,6 +259,10 @@ private SelectionResultsBlock computePartiallyOrdered() { } } _numDocsScanned += numDocsFetched; + QueryScanCostContext scanCost2 = getScanCostContext(); + if (scanCost2 != null) { + scanCost2.addDocsScanned(numDocsFetched); + } } _numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected; @@ -368,4 +378,10 @@ public ExecutionStatistics getExecutionStatistics() { return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, _numEntriesScannedPostFilter, numTotalDocs); } + + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java index fb318d85be38..97353f542c50 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java @@ -30,6 +30,8 @@ import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -71,6 +73,10 @@ protected List fetch(Supplier listBuilderSupplier) { IntFunction rowFetcher = fetchBlock(valueBlock, blockValSets); int numDocsFetched = valueBlock.getNumDocs(); _numDocsScanned += numDocsFetched; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsFetched); + } ListBuilder listBuilder = listBuilderSupplier.get(); // first, calculate the best rows on this block @@ -104,4 +110,10 @@ protected int getNumDocsScanned() { protected String getUpperCaseExplainName() { return EXPLAIN_NAME; } + + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java index eaedac0003ca..95e1a2c66675 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java @@ -28,6 +28,8 @@ import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -70,6 +72,10 @@ protected List fetch(Supplier listBuilderSupplier) { IntFunction rowFetcher = fetchBlock(valueBlock, blockValSets); int numDocsFetched = valueBlock.getNumDocs(); _numDocsScanned += numDocsFetched; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsFetched); + } for (int i = 0; i < numDocsFetched; i++) { if (listBuilder.add(rowFetcher.apply(i))) { return listBuilder.build(); @@ -88,4 +94,10 @@ public int getNumDocsScanned() { protected String getUpperCaseExplainName() { return EXPLAIN_NAME; } + + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java index bb5171c44d7c..b63f69169aac 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java @@ -36,6 +36,8 @@ import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; import org.roaringbitmap.RoaringBitmap; @@ -122,6 +124,10 @@ protected SelectionResultsBlock getNextBlock() { } } _numDocsScanned += numDocs; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocs); + } return new SelectionResultsBlock(_dataSchema, rows, _queryContext); } @@ -162,4 +168,10 @@ public ExecutionStatistics getExecutionStatistics() { return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, numTotalDocs); } + + @javax.annotation.Nullable + private static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 6e1ef3c15090..d7a5efcad3ce 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -59,6 +59,8 @@ import org.apache.pinot.core.plan.maker.PlanMaker; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.config.SegmentPrunerConfig; +import org.apache.pinot.core.query.killing.QueryKillingManager; +import org.apache.pinot.core.query.killing.QueryKillingStrategy; import org.apache.pinot.core.query.pruner.SegmentPrunerService; import org.apache.pinot.core.query.pruner.SegmentPrunerStatistics; import org.apache.pinot.core.query.request.ServerQueryRequest; @@ -68,14 +70,19 @@ import org.apache.pinot.core.query.utils.idset.IdSet; import org.apache.pinot.core.util.trace.TraceContext; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.spi.config.table.QueryConfig; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.QueryCancelledException; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.exception.QueryException; import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.query.QueryExecutionContext; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.trace.Tracer; import org.apache.pinot.spi.trace.Tracing; @@ -176,6 +183,9 @@ private InstanceResponseBlock executeInternal(ServerQueryRequest queryRequest, E queryContext.setEnablePrefetch(_enablePrefetch); + // Initialize scan-based query killing for this query + initScanBasedKilling(queryRequest, tableNameWithType); + // Query scheduler wait time already exceeds query timeout, directly return long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs; if (querySchedulingTimeMs >= queryTimeoutMs) { @@ -520,6 +530,40 @@ private void handleSubquery(ExpressionContext expression, TableExecutionInfo exe } } + /** + * Initializes scan-based query killing for this query. Sets up a {@link QueryScanCostContext} + * on the current thread's {@link QueryExecutionContext} so operators can push scan deltas, and + * caches the resolved per-query strategy so table-level overrides are applied only once. + */ + private void initScanBasedKilling(ServerQueryRequest queryRequest, String tableNameWithType) { + QueryKillingManager killingManager = QueryKillingManager.getInstance(); + if (killingManager == null) { + return; + } + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + if (ctx == null) { + return; + } + QueryExecutionContext execCtx = ctx.getExecutionContext(); + execCtx.setTableName(tableNameWithType); + execCtx.setQueryId(queryRequest.getQueryId()); + execCtx.setQueryScanCostContext(new QueryScanCostContext()); + + // Resolve and cache per-query strategy (applies table-level threshold overrides) + QueryConfig queryConfig = null; + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); + if (tableDataManager != null) { + TableConfig tableConfig = tableDataManager.getCachedTableConfigAndSchema().getLeft(); + if (tableConfig != null) { + queryConfig = tableConfig.getQueryConfig(); + } + } + QueryKillingStrategy queryStrategy = killingManager.resolveQueryStrategy(queryConfig); + if (queryStrategy != null) { + execCtx.setCachedKillingStrategy(queryStrategy); + } + } + private void addPrunerStats(InstanceResponseBlock instanceResponse, SegmentPrunerStatistics prunerStats) { instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), String.valueOf(prunerStats.getInvalidSegments())); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java index aa363d4d3608..fd596ae86fa2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java @@ -73,10 +73,10 @@ public boolean shouldTerminate(QueryScanCostContext ctx) { @Override public QueryKillReport buildKillReport(QueryScanCostContext ctx, - String queryId, String tableName, String configSource) { + long requestId, String queryId, String tableName, String configSource) { for (QueryKillingStrategy s : _strategies) { if (s.shouldTerminate(ctx)) { - return s.buildKillReport(ctx, queryId, tableName, configSource); + return s.buildKillReport(ctx, requestId, queryId, tableName, configSource); } } throw new IllegalStateException("buildKillReport called but no strategy triggered"); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java index d44d873188dc..f98658314248 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java @@ -25,6 +25,7 @@ * Immutable snapshot of a query-kill event */ public final class QueryKillReport { + private final long _requestId; private final String _queryId; private final String _tableName; private final String _strategyName; @@ -40,6 +41,7 @@ public final class QueryKillReport { /** * Creates a {@code QueryKillReport} by snapshotting the current state of {@code context}. * + * @param requestId server request ID for tracing * @param queryId unique identifier of the killed query * @param tableName fully-qualified table name (e.g. {@code myTable_OFFLINE}) * @param strategyName name of the kill strategy that triggered the kill @@ -49,8 +51,9 @@ public final class QueryKillReport { * @param configSource source of the threshold config (e.g. {@code TABLE_CONFIG}) * @param context live scan-cost context; values are snapshotted immediately */ - public QueryKillReport(String queryId, String tableName, String strategyName, String triggeringMetric, + public QueryKillReport(long requestId, String queryId, String tableName, String strategyName, String triggeringMetric, long actualValue, long thresholdValue, String configSource, QueryScanCostContext context) { + _requestId = requestId; _queryId = queryId; _tableName = tableName; _strategyName = strategyName; @@ -67,6 +70,10 @@ public QueryKillReport(String queryId, String tableName, String strategyName, St // ----- Getters ----- + public long getRequestId() { + return _requestId; + } + public String getQueryId() { return _queryId; } @@ -121,13 +128,13 @@ public long getElapsedTimeMs() { */ public String toCustomerMessage() { return String.format( - "Query '%s' on table '%s' was killed because '%s' (%,d) exceeded the threshold (%,d) " + "Query '%s' (requestId=%d) on table '%s' was killed because '%s' (%,d) exceeded the threshold (%,d) " + "configured in %s. " + "At kill time: entriesScannedInFilter=%,d, docsScanned=%,d, " + "entriesScannedPostFilter=%,d, elapsedMs=%d. " + "To reduce scan cost, consider adding a missing index (e.g. inverted or range index) " + "on the filter columns.", - _queryId, _tableName, _triggeringMetric, _actualValue, _thresholdValue, _configSource, + _queryId, _requestId, _tableName, _triggeringMetric, _actualValue, _thresholdValue, _configSource, _snapshotEntriesScannedInFilter, _snapshotDocsScanned, _snapshotEntriesScannedPostFilter, _elapsedTimeMs); } @@ -138,10 +145,10 @@ public String toCustomerMessage() { */ public String toInternalLogMessage() { return String.format( - "QUERY_KILLED queryId=%s table=%s strategy=%s metric=%s actual=%d threshold=%d " + "QUERY_KILLED requestId=%d queryId=%s table=%s strategy=%s metric=%s actual=%d threshold=%d " + "configSource=%s entriesScannedInFilter=%d docsScanned=%d " + "entriesScannedPostFilter=%d elapsedMs=%d", - _queryId, _tableName, _strategyName, _triggeringMetric, _actualValue, _thresholdValue, + _requestId, _queryId, _tableName, _strategyName, _triggeringMetric, _actualValue, _thresholdValue, _configSource, _snapshotEntriesScannedInFilter, _snapshotDocsScanned, _snapshotEntriesScannedPostFilter, _elapsedTimeMs); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java index ee8f0678113b..771958fd2676 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java @@ -18,13 +18,17 @@ */ package org.apache.pinot.core.query.killing; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.accounting.QueryMonitorConfig; import org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy; +import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; import org.apache.pinot.spi.config.table.QueryConfig; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.query.QueryExecutionContext; import org.apache.pinot.spi.query.QueryScanCostContext; import org.slf4j.Logger; @@ -35,12 +39,12 @@ * Central manager for scan-based query killing. Owns the guard rails and delegates the * actual kill decision to a {@link QueryKillingStrategy}. * - * The default factory is {@link ScanEntriesThresholdStrategy.Factory}, which reads - * scan thresholds from {@link QueryMonitorConfig}. Custom factories can be configured - * via {@code accounting.scan.based.killing.strategy.factory.class.name}. + *

The strategy is built once at init via a {@link QueryKillingStrategyFactory} and rebuilt when + * cluster config changes via {@link #onChange}. The default factory is + * {@link ScanEntriesThresholdStrategy.Factory}.

* */ -public class QueryKillingManager { +public class QueryKillingManager implements PinotClusterConfigChangeListener { private static final Logger LOGGER = LoggerFactory.getLogger(QueryKillingManager.class); private static volatile QueryKillingManager _instance; @@ -64,10 +68,14 @@ public QueryKillingManager(AtomicReference configRef, Server * Initializes the singleton instance and builds the strategy from config. * Called once during server startup. */ - public static void init(AtomicReference configRef, ServerMetrics serverMetrics) { + public static QueryKillingManager init(PinotConfiguration schedulerConfig, ServerMetrics serverMetrics) { + long maxHeapSize = Runtime.getRuntime().maxMemory(); + QueryMonitorConfig config = new QueryMonitorConfig(schedulerConfig, maxHeapSize); + AtomicReference configRef = new AtomicReference<>(config); QueryKillingManager manager = new QueryKillingManager(configRef, serverMetrics); manager.rebuildStrategy(); _instance = manager; + return manager; } @Nullable @@ -129,6 +137,29 @@ public QueryKillingStrategy getActiveStrategy() { return _strategy; } + /** + * Handles ZK cluster config changes. Rebuilds the {@link QueryMonitorConfig} from the delta + * and refreshes the killing strategy if scan-killing-related keys changed. + */ + @Override + public void onChange(Set changedConfigs, Map clusterConfigs) { + QueryMonitorConfig updated = new QueryMonitorConfig(_configRef.get(), changedConfigs, clusterConfigs); + _configRef.set(updated); + rebuildStrategy(); + } + + /** + * Convenience overload called from {@link org.apache.pinot.core.operator.BaseOperator#checkTermination()}. + * Reads query context (table name, query id, cached strategy) from the execution context. + */ + public void checkAndKillIfNeeded(QueryExecutionContext executionContext, QueryScanCostContext scanCostContext) { + Object cached = executionContext.getCachedKillingStrategy(); + QueryKillingStrategy cachedStrategy = + (cached instanceof QueryKillingStrategy) ? (QueryKillingStrategy) cached : null; + checkAndKillIfNeeded(executionContext, scanCostContext, cachedStrategy, + executionContext.getQueryId(), executionContext.getTableName()); + } + /** * Evaluates whether the query should be killed based on the active strategy. * @@ -138,7 +169,6 @@ public QueryKillingStrategy getActiveStrategy() { public void checkAndKillIfNeeded(QueryExecutionContext executionContext, QueryScanCostContext scanCostContext, String queryId, String tableName, @Nullable QueryConfig queryConfig) { - // no strategy means killing is disabled or unconfigured QueryKillingStrategy strategy = _strategy; if (strategy == null) { return; @@ -149,37 +179,80 @@ public void checkAndKillIfNeeded(QueryExecutionContext executionContext, return; } - // Prevent duplicate kills if (executionContext.getTerminateException() != null) { return; } try { - // Resolve per-query table overrides (returns same instance if no overrides) QueryKillingStrategy queryStrategy = strategy.forQuery(queryConfig, config); - String configSource = (queryStrategy != strategy) ? "table:" + tableName : "cluster"; + checkAndKillWithStrategy(executionContext, scanCostContext, queryStrategy, configSource, queryId, tableName, + config); + } catch (Exception e) { + LOGGER.error("Error in scan-based killing evaluation for query {}", queryId, e); + _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1); + } + } - // Delegate to strategy - if (!queryStrategy.shouldTerminate(scanCostContext)) { - return; - } - - QueryKillReport report = queryStrategy.buildKillReport( - scanCostContext, queryId, tableName, configSource); - - if (config.isScanBasedKillingLogOnly()) { - LOGGER.info("Query killed in LogOnly mode: {}", report.toInternalLogMessage()); - _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1); - return; - } + /** + * Resolves a per-query strategy (applying table-level overrides from {@code queryConfig}). + * Returns null if killing is disabled. Caches the resolved strategy on the execution context. + */ + @Nullable + public QueryKillingStrategy resolveQueryStrategy(@Nullable QueryConfig queryConfig) { + QueryKillingStrategy strategy = _strategy; + if (strategy == null) { + return null; + } + QueryMonitorConfig config = _configRef.get(); + if (config == null || !config.isScanBasedKillingEnabled()) { + return null; + } + return strategy.forQuery(queryConfig, config); + } - LOGGER.warn("Query Killed in enforce mode: {}", report.toInternalLogMessage()); - executionContext.terminate(queryStrategy.getErrorCode(), report.toCustomerMessage()); - _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1); + private void checkAndKillIfNeeded(QueryExecutionContext executionContext, QueryScanCostContext scanCostContext, + @Nullable QueryKillingStrategy cachedStrategy, @Nullable String queryId, @Nullable String tableName) { + QueryKillingStrategy strategy = cachedStrategy != null ? cachedStrategy : _strategy; + if (strategy == null) { + return; + } + QueryMonitorConfig config = _configRef.get(); + if (config == null || !config.isScanBasedKillingEnabled()) { + return; + } + if (executionContext.getTerminateException() != null) { + return; + } + String resolvedQueryId = queryId != null ? queryId : "unknown"; + String resolvedTableName = tableName != null ? tableName : "unknown"; + String configSource = (cachedStrategy != null && cachedStrategy != _strategy) ? "table:" + resolvedTableName + : "cluster"; + try { + checkAndKillWithStrategy(executionContext, scanCostContext, strategy, configSource, resolvedQueryId, + resolvedTableName, config); } catch (Exception e) { - LOGGER.error("Error in scan-based killing evaluation for query {}", queryId, e); + LOGGER.error("Error in scan-based killing evaluation for query {}", resolvedQueryId, e); _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1); } } + + private void checkAndKillWithStrategy(QueryExecutionContext executionContext, QueryScanCostContext scanCostContext, + QueryKillingStrategy queryStrategy, String configSource, String queryId, String tableName, + QueryMonitorConfig config) { + if (!queryStrategy.shouldTerminate(scanCostContext)) { + return; + } + long requestId = executionContext.getRequestId(); + QueryKillReport report = queryStrategy.buildKillReport(scanCostContext, requestId, queryId, tableName, + configSource); + if (config.isScanBasedKillingLogOnly()) { + LOGGER.info("Query killed in LogOnly mode: {}", report.toInternalLogMessage()); + _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1); + return; + } + LOGGER.warn("Query Killed in enforce mode: {}", report.toInternalLogMessage()); + executionContext.terminate(queryStrategy.getErrorCode(), report.toCustomerMessage()); + _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java index 4bf4390d59e9..c5e1248689f8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java @@ -47,7 +47,7 @@ public interface QueryKillingStrategy { * Only called when {@link #shouldTerminate} returns true. */ QueryKillReport buildKillReport(QueryScanCostContext context, - String queryId, String tableName, String configSource); + long requestId, String queryId, String tableName, String configSource); /** Error code for the termination response. */ default QueryErrorCode getErrorCode() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java index 92b13dcdc607..62cc45f287d5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java @@ -65,7 +65,7 @@ public boolean shouldTerminate(QueryScanCostContext ctx) { @Override public QueryKillReport buildKillReport(QueryScanCostContext ctx, - String queryId, String tableName, String configSource) { + long requestId, String queryId, String tableName, String configSource) { String triggeringMetric; long actualValue; long thresholdValue; @@ -79,7 +79,7 @@ public QueryKillReport buildKillReport(QueryScanCostContext ctx, actualValue = ctx.getNumDocsScanned(); thresholdValue = _maxDocsScanned; } - return new QueryKillReport(queryId, tableName, STRATEGY_NAME, + return new QueryKillReport(requestId, queryId, tableName, STRATEGY_NAME, triggeringMetric, actualValue, thresholdValue, configSource, ctx); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java index ea48ff498598..6a31a4b8d765 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java @@ -117,7 +117,7 @@ public void testBuildKillReportDelegatesToTriggeringStrategy() { ctx.addDocsScanned(501); assertTrue(composite.shouldTerminate(ctx)); - QueryKillReport report = composite.buildKillReport(ctx, "q1", "t1", "cluster"); + QueryKillReport report = composite.buildKillReport(ctx, 1L, "q1", "t1", "cluster"); assertEquals(report.getTriggeringMetric(), "numDocsScanned"); assertEquals(report.getActualValue(), 501L); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java index 054197aeaed0..6819a6991be7 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java @@ -38,6 +38,7 @@ public void testSnapshotsValuesAtCreationTime() { context.addEntriesScannedPostFilter(200L); QueryKillReport report = new QueryKillReport( + 101L, "queryId-1", "myTable_OFFLINE", "EntriesScannedInFilterStrategy", @@ -66,6 +67,7 @@ public void testCustomerMessageContainsAllFields() { context.addEntriesScannedPostFilter(50L); QueryKillReport report = new QueryKillReport( + 202L, "queryId-abc", "salesTable_REALTIME", "EntriesScannedInFilterStrategy", @@ -83,6 +85,7 @@ public void testCustomerMessageContainsAllFields() { assertTrue(msg.contains("1,234,567"), "Should contain actual value with commas"); assertTrue(msg.contains("1,000,000"), "Should contain threshold with commas"); assertTrue(msg.contains("CLUSTER_CONFIG"), "Should contain config source"); + assertTrue(msg.contains("requestId=202"), "Should contain requestId"); assertTrue(msg.contains("missing index") || msg.contains("index"), "Should include advice about missing indexes"); } @@ -95,6 +98,7 @@ public void testInternalLogMessageIsStructured() { context.addEntriesScannedPostFilter(150L); QueryKillReport report = new QueryKillReport( + 303L, "queryId-xyz", "ordersTable_OFFLINE", "DocsScannedStrategy", @@ -108,6 +112,7 @@ public void testInternalLogMessageIsStructured() { String msg = report.toInternalLogMessage(); assertTrue(msg.startsWith("QUERY_KILLED"), "Should start with QUERY_KILLED prefix"); + assertTrue(msg.contains("requestId=303"), "Should have requestId key=value"); assertTrue(msg.contains("queryId=queryId-xyz"), "Should have queryId key=value"); assertTrue(msg.contains("table=ordersTable_OFFLINE"), "Should have table key=value"); assertTrue(msg.contains("metric=numDocsScanned"), "Should have metric key=value"); @@ -124,6 +129,7 @@ public void testGetters() { context.addEntriesScannedPostFilter(25L); QueryKillReport report = new QueryKillReport( + 404L, "queryId-getters", "testTable_OFFLINE", "TestStrategy", @@ -134,6 +140,7 @@ public void testGetters() { context ); + assertEquals(report.getRequestId(), 404L); assertEquals(report.getQueryId(), "queryId-getters"); assertEquals(report.getTableName(), "testTable_OFFLINE"); assertEquals(report.getStrategyName(), "TestStrategy"); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java index 5bde697b5a36..fb039aea4349 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java @@ -308,8 +308,8 @@ public boolean shouldTerminate(QueryScanCostContext context) { @Override public QueryKillReport buildKillReport(QueryScanCostContext context, - String queryId, String tableName, String configSource) { - return new QueryKillReport(queryId, tableName, "AlwaysKillStrategy", + long requestId, String queryId, String tableName, String configSource) { + return new QueryKillReport(requestId, queryId, tableName, "AlwaysKillStrategy", "always", 0, 0, configSource, context); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java index 48785498162d..eebef3e58811 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java @@ -81,7 +81,7 @@ public void testBuildKillReportForEntriesScanned() { QueryScanCostContext ctx = new QueryScanCostContext(); ctx.addEntriesScannedInFilter(150_000_000); - QueryKillReport report = strategy.buildKillReport(ctx, "q1", "myTable", "cluster"); + QueryKillReport report = strategy.buildKillReport(ctx, 1L, "q1", "myTable", "cluster"); assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter"); assertEquals(report.getActualValue(), 150_000_000L); assertEquals(report.getThresholdValue(), 100_000_000L); @@ -93,7 +93,7 @@ public void testBuildKillReportForDocsScanned() { QueryScanCostContext ctx = new QueryScanCostContext(); ctx.addDocsScanned(15_000_000); - QueryKillReport report = strategy.buildKillReport(ctx, "q2", "myTable", "table:myTable"); + QueryKillReport report = strategy.buildKillReport(ctx, 2L, "q2", "myTable", "table:myTable"); assertEquals(report.getTriggeringMetric(), "numDocsScanned"); assertEquals(report.getActualValue(), 15_000_000L); assertEquals(report.getThresholdValue(), 10_000_000L); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 638553f05d1e..c30b8817dd85 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -85,6 +85,7 @@ import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager; import org.apache.pinot.core.data.manager.realtime.ServerRateLimitConfigChangeListener; import org.apache.pinot.core.instance.context.ServerContext; +import org.apache.pinot.core.query.killing.QueryKillingManager; import org.apache.pinot.core.query.scheduler.QuerySchedulerThreadPoolConfigChangeListener; import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.core.transport.ListenerConfig; @@ -187,6 +188,7 @@ public abstract class BaseServerStarter implements ServiceStartable { protected PinotEnvironmentProvider _pinotEnvironmentProvider; protected SegmentOperationsThrottlerSet _segmentOperationsThrottlerSet; protected ThreadAccountant _threadAccountant; + protected QueryKillingManager _queryKillingManager; protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler; protected volatile boolean _isServerReadyToServeQueries = false; protected ScheduledExecutorService _helixMessageCountScheduler; @@ -758,6 +760,10 @@ public void start() _threadAccountant = ThreadAccountantUtils.createAccountant(accountingConfig, _instanceId, org.apache.pinot.spi.config.instance.InstanceType.SERVER); + // Initialize scan-based query killing + PinotConfiguration schedulerConfig = _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX); + _queryKillingManager = QueryKillingManager.init(schedulerConfig, ServerMetrics.get()); + SendStatsPredicate sendStatsPredicate = SendStatsPredicate.create(_serverConf, _helixManager); KeepPipelineBreakerStatsPredicate keepPipelineBreakerStatsPredicate = KeepPipelineBreakerStatsPredicate.create(_serverConf); @@ -775,6 +781,11 @@ public void start() new ServerRateLimitConfigChangeListener(_serverMetrics); _clusterConfigChangeHandler.registerClusterConfigChangeListener(serverRateLimitConfigChangeListener); + // Register query killing manager for dynamic config updates (threshold/mode changes via ZK) + if (_queryKillingManager != null) { + _clusterConfigChangeHandler.registerClusterConfigChangeListener(_queryKillingManager); + } + initSegmentFetcher(_serverConf); StateModelFactory stateModelFactory = createSegmentOnlineOfflineStateModelFactory(instanceDataManager, _transitionThreadPoolManager); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java index ca970cf5ff19..01a292c6fc07 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java @@ -65,6 +65,16 @@ public enum QueryType { private volatile TerminationException _terminateException; + // Scan-based query killing (set per-query when killing is enabled) + @Nullable + private QueryScanCostContext _queryScanCostContext; + @Nullable + private Object _cachedKillingStrategy; + @Nullable + private String _tableName; + @Nullable + private String _queryId; + public QueryExecutionContext(QueryType queryType, long requestId, String cid, String workloadName, long startTimeMs, long activeDeadlineMs, long passiveDeadlineMs, String brokerId, String instanceId, String queryHash) { _queryType = queryType; @@ -193,4 +203,40 @@ public synchronized boolean terminate(QueryErrorCode errorCode, String message) public TerminationException getTerminateException() { return _terminateException; } + + @Nullable + public QueryScanCostContext getQueryScanCostContext() { + return _queryScanCostContext; + } + + public void setQueryScanCostContext(@Nullable QueryScanCostContext queryScanCostContext) { + _queryScanCostContext = queryScanCostContext; + } + + @Nullable + public Object getCachedKillingStrategy() { + return _cachedKillingStrategy; + } + + public void setCachedKillingStrategy(@Nullable Object cachedKillingStrategy) { + _cachedKillingStrategy = cachedKillingStrategy; + } + + @Nullable + public String getTableName() { + return _tableName; + } + + public void setTableName(@Nullable String tableName) { + _tableName = tableName; + } + + @Nullable + public String getQueryId() { + return _queryId; + } + + public void setQueryId(@Nullable String queryId) { + _queryId = queryId; + } } From 30f3a5887edc1043423a4563a98d56a68158f499 Mon Sep 17 00:00:00 2001 From: Anurag Rai Date: Tue, 12 May 2026 15:17:59 +0530 Subject: [PATCH 2/6] fix claude review found issues --- .../pinot/core/operator/BaseOperator.java | 16 ++- .../pinot/core/operator/DocIdSetOperator.java | 6 - .../operator/query/AggregationOperator.java | 7 - .../core/operator/query/DistinctOperator.java | 7 - .../query/FilteredAggregationOperator.java | 7 - .../query/FilteredGroupByOperator.java | 7 - .../core/operator/query/GroupByOperator.java | 7 - .../operator/query/SelectionOnlyOperator.java | 7 - .../query/SelectionOrderByOperator.java | 7 - ...ectionPartiallyOrderedByDescOperation.java | 7 - ...ctionPartiallyOrderedByLinearOperator.java | 7 - .../StreamingSelectionOnlyOperator.java | 7 - .../query/killing/QueryKillingManager.java | 18 ++- .../killing/QueryKillingManagerTest.java | 122 ++++++++++++++++++ .../spi/query/QueryExecutionContext.java | 13 +- 15 files changed, 156 insertions(+), 89 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java index 7805cccefa9e..605e4dae91cb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java @@ -59,7 +59,15 @@ public ExplainInfo getExplainInfo() { protected void checkTermination() { QueryThreadContext.checkTermination(this::getExplainName); - // Scan-based query killing check + checkScanBasedKilling(); + } + + protected void checkTerminationAndSampleUsage() { + QueryThreadContext.checkTerminationAndSampleUsage(this::getExplainName); + checkScanBasedKilling(); + } + + private void checkScanBasedKilling() { QueryKillingManager killingManager = QueryKillingManager.getInstance(); if (killingManager != null) { QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); @@ -77,8 +85,10 @@ protected void checkTermination() { } } - protected void checkTerminationAndSampleUsage() { - QueryThreadContext.checkTerminationAndSampleUsage(this::getExplainName); + @javax.annotation.Nullable + protected static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; } protected List getChildrenExplainInfo() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java index 6aca0146a29a..c96936c36829 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java @@ -98,12 +98,6 @@ protected DocIdSetBlock getNextBlock() { } } - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } - @Override public String toExplainString() { return EXPLAIN_NAME; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java index a5edb35673cf..90b5f8d1450d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java @@ -36,7 +36,6 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor; import org.apache.pinot.spi.query.QueryScanCostContext; -import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -127,10 +126,4 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { .collect(Collectors.toList()); attributeBuilder.putStringList("aggregations", aggregations); } - - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java index 97efd556e3d9..9fe9bfd0b468 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java @@ -34,7 +34,6 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.query.QueryScanCostContext; -import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -123,10 +122,4 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { .collect(Collectors.toList()); attributeBuilder.putStringList("keyColumns", expressions); } - - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java index e04760247b48..4fe179ecc2b3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java @@ -36,7 +36,6 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor; import org.apache.pinot.spi.query.QueryScanCostContext; -import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -133,10 +132,4 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { attributeBuilder.putStringList("aggregations", aggregations); } } - - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java index 12fa5e507481..4922e06a41a2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java @@ -49,7 +49,6 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor; import org.apache.pinot.spi.query.QueryScanCostContext; -import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -294,10 +293,4 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { attributeBuilder.putStringList("aggregations", aggregations); } } - - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java index 9f1591cea013..fabbff2e9c0e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java @@ -43,7 +43,6 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor; import org.apache.pinot.spi.query.QueryScanCostContext; -import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,10 +242,4 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { .collect(Collectors.toList()); attributeBuilder.putStringList("aggregations", aggregations); } - - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java index b58f4c3ea9b9..64e0b0219c1a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java @@ -39,7 +39,6 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.query.QueryScanCostContext; -import org.apache.pinot.spi.query.QueryThreadContext; import org.roaringbitmap.RoaringBitmap; @@ -174,10 +173,4 @@ public ExecutionStatistics getExecutionStatistics() { return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, numTotalDocs); } - - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java index 8961ddfc3752..e5e01d6f812d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java @@ -53,7 +53,6 @@ import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.query.QueryScanCostContext; -import org.apache.pinot.spi.query.QueryThreadContext; import org.roaringbitmap.RoaringBitmap; @@ -378,10 +377,4 @@ public ExecutionStatistics getExecutionStatistics() { return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, _numEntriesScannedPostFilter, numTotalDocs); } - - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java index 97353f542c50..77758887ea86 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java @@ -31,7 +31,6 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.query.QueryScanCostContext; -import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -110,10 +109,4 @@ protected int getNumDocsScanned() { protected String getUpperCaseExplainName() { return EXPLAIN_NAME; } - - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java index 95e1a2c66675..eddb675f01ef 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java @@ -29,7 +29,6 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.query.QueryScanCostContext; -import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -94,10 +93,4 @@ public int getNumDocsScanned() { protected String getUpperCaseExplainName() { return EXPLAIN_NAME; } - - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java index b63f69169aac..66fa720901b1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java @@ -37,7 +37,6 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.query.QueryScanCostContext; -import org.apache.pinot.spi.query.QueryThreadContext; import org.roaringbitmap.RoaringBitmap; @@ -168,10 +167,4 @@ public ExecutionStatistics getExecutionStatistics() { return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, numTotalDocs); } - - @javax.annotation.Nullable - private static QueryScanCostContext getScanCostContext() { - QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); - return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java index 771958fd2676..b5ced1301e64 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java @@ -142,7 +142,7 @@ public QueryKillingStrategy getActiveStrategy() { * and refreshes the killing strategy if scan-killing-related keys changed. */ @Override - public void onChange(Set changedConfigs, Map clusterConfigs) { + public synchronized void onChange(Set changedConfigs, Map clusterConfigs) { QueryMonitorConfig updated = new QueryMonitorConfig(_configRef.get(), changedConfigs, clusterConfigs); _configRef.set(updated); rebuildStrategy(); @@ -154,8 +154,15 @@ public void onChange(Set changedConfigs, Map clusterConf */ public void checkAndKillIfNeeded(QueryExecutionContext executionContext, QueryScanCostContext scanCostContext) { Object cached = executionContext.getCachedKillingStrategy(); - QueryKillingStrategy cachedStrategy = - (cached instanceof QueryKillingStrategy) ? (QueryKillingStrategy) cached : null; + QueryKillingStrategy cachedStrategy; + if (cached instanceof QueryKillingStrategy) { + cachedStrategy = (QueryKillingStrategy) cached; + } else { + if (cached != null) { + LOGGER.warn("Unexpected cached killing strategy type: {}", cached.getClass().getName()); + } + cachedStrategy = null; + } checkAndKillIfNeeded(executionContext, scanCostContext, cachedStrategy, executionContext.getQueryId(), executionContext.getTableName()); } @@ -213,7 +220,8 @@ public QueryKillingStrategy resolveQueryStrategy(@Nullable QueryConfig queryConf private void checkAndKillIfNeeded(QueryExecutionContext executionContext, QueryScanCostContext scanCostContext, @Nullable QueryKillingStrategy cachedStrategy, @Nullable String queryId, @Nullable String tableName) { - QueryKillingStrategy strategy = cachedStrategy != null ? cachedStrategy : _strategy; + QueryKillingStrategy currentStrategy = _strategy; + QueryKillingStrategy strategy = cachedStrategy != null ? cachedStrategy : currentStrategy; if (strategy == null) { return; } @@ -226,7 +234,7 @@ private void checkAndKillIfNeeded(QueryExecutionContext executionContext, QueryS } String resolvedQueryId = queryId != null ? queryId : "unknown"; String resolvedTableName = tableName != null ? tableName : "unknown"; - String configSource = (cachedStrategy != null && cachedStrategy != _strategy) ? "table:" + resolvedTableName + String configSource = (cachedStrategy != null && cachedStrategy != currentStrategy) ? "table:" + resolvedTableName : "cluster"; try { checkAndKillWithStrategy(executionContext, scanCostContext, strategy, configSource, resolvedQueryId, diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java index fb039aea4349..c2d7d4ab2651 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java @@ -19,7 +19,9 @@ package org.apache.pinot.core.query.killing; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.accounting.QueryMonitorConfig; @@ -295,6 +297,126 @@ public void testRebuildStrategyPicksUpConfigChanges() { assertNotNull(manager.getActiveStrategy(), "After config update, strategy should be built"); } + // --- onChange (dynamic config reload) --- + + @Test + public void testOnChangeRebuildsStrategy() { + // Start with killing disabled + QueryMonitorConfig disabledConfig = buildConfig("disabled", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(disabledConfig); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + assertNull(manager.getActiveStrategy(), "Strategy should be null when disabled"); + + // Simulate cluster config change enabling killing with enforce mode + threshold + Set changedKeys = new HashSet<>(); + changedKeys.add(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE); + changedKeys.add(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER); + + Map clusterConfigs = new HashMap<>(); + clusterConfigs.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "enforce"); + clusterConfigs.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER, "500"); + + manager.onChange(changedKeys, clusterConfigs); + assertNotNull(manager.getActiveStrategy(), + "Strategy should be rebuilt after onChange enables killing"); + } + + @Test + public void testOnChangeDisablesStrategy() { + // Start with killing enabled + QueryMonitorConfig enabledConfig = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(enabledConfig); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + assertNotNull(manager.getActiveStrategy(), "Strategy should be active when enabled"); + + // Simulate cluster config change to disable killing + Set changedKeys = new HashSet<>(); + changedKeys.add(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE); + + Map clusterConfigs = new HashMap<>(); + clusterConfigs.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "disabled"); + + manager.onChange(changedKeys, clusterConfigs); + assertNull(manager.getActiveStrategy(), + "Strategy should be null after onChange disables killing"); + } + + // --- Convenience overload (2-arg checkAndKillIfNeeded) --- + + @Test + public void testConvenienceOverloadKillsViaCachedStrategy() { + // Create a manager with killing enabled, threshold = 50 for entries scanned + QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + // Resolve the per-query strategy and cache it on the execution context + QueryKillingStrategy resolvedStrategy = manager.resolveQueryStrategy(null); + assertNotNull(resolvedStrategy); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + execCtx.setTableName("testTable_OFFLINE"); + execCtx.setQueryId("conv-q1"); + execCtx.setCachedKillingStrategy(resolvedStrategy); + + // Create scan cost exceeding the threshold + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(100L); // Above threshold of 50 + + // Use the 2-arg convenience overload + manager.checkAndKillIfNeeded(execCtx, scanCtx); + assertNotNull(execCtx.getTerminateException(), + "Query should be terminated via cached strategy when threshold is exceeded"); + } + + @Test + public void testConvenienceOverloadNullScanCostContextNoOp() { + // When the manager's strategy is null (disabled), calling with null scanCostContext is a no-op. + // In production, BaseOperator guards against null scanCostContext before calling the manager. + // Here we verify the manager's early-return when disabled. + QueryMonitorConfig config = buildConfig("disabled", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // No queryScanCostContext set on execCtx — mirrors the real scenario where + // scan-based killing is not initialized for this query. + + // The 2-arg overload with null scanCostContext is safe when strategy is null (disabled) + manager.checkAndKillIfNeeded(execCtx, null); + assertNull(execCtx.getTerminateException(), + "No exception should be set when killing is disabled and scanCostContext is null"); + } + + // --- resolveQueryStrategy --- + + @Test + public void testResolveQueryStrategyReturnsNullWhenDisabled() { + QueryMonitorConfig config = buildConfig("disabled", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryKillingStrategy resolved = manager.resolveQueryStrategy(null); + assertNull(resolved, "resolveQueryStrategy should return null when killing is disabled"); + } + + @Test + public void testResolveQueryStrategyReturnsStrategyWhenEnabled() { + QueryMonitorConfig config = buildConfig("enforce", 200L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryKillingStrategy resolved = manager.resolveQueryStrategy(null); + assertNotNull(resolved, "resolveQueryStrategy should return a strategy when killing is enabled"); + assertTrue(resolved instanceof ScanEntriesThresholdStrategy); + } + // --- Test fixtures for pluggable strategy --- /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java index 01a292c6fc07..b4e8d625d598 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java @@ -65,15 +65,18 @@ public enum QueryType { private volatile TerminationException _terminateException; - // Scan-based query killing (set per-query when killing is enabled) + /// Per-query scan cost accumulators for scan-based killing, tracking cumulative scan cost across all segments. @Nullable - private QueryScanCostContext _queryScanCostContext; + private volatile QueryScanCostContext _queryScanCostContext; + @Nullable - private Object _cachedKillingStrategy; + private volatile Object _cachedKillingStrategy; + @Nullable - private String _tableName; + private volatile String _tableName; + @Nullable - private String _queryId; + private volatile String _queryId; public QueryExecutionContext(QueryType queryType, long requestId, String cid, String workloadName, long startTimeMs, long activeDeadlineMs, long passiveDeadlineMs, String brokerId, String instanceId, String queryHash) { From c3590c27e129530d0794ce44b53393736edb8c33 Mon Sep 17 00:00:00 2001 From: Anurag Rai Date: Wed, 13 May 2026 10:33:48 +0530 Subject: [PATCH 3/6] add entries scanned post filter instrumentation in query killing --- .../operator/query/AggregationOperator.java | 2 + .../core/operator/query/DistinctOperator.java | 2 + .../query/FilteredAggregationOperator.java | 2 + .../query/FilteredGroupByOperator.java | 2 + .../core/operator/query/GroupByOperator.java | 2 + .../operator/query/SelectionOnlyOperator.java | 2 + .../query/SelectionOrderByOperator.java | 2 + ...ectionPartiallyOrderedByDescOperation.java | 2 + ...ctionPartiallyOrderedByLinearOperator.java | 2 + .../StreamingSelectionOnlyOperator.java | 2 + .../ScanEntriesThresholdStrategy.java | 42 +++- .../query/OperatorScanCostTrackingTest.java | 207 ++++++++++++++++++ .../ScanEntriesThresholdStrategyTest.java | 58 +++++ 13 files changed, 318 insertions(+), 9 deletions(-) create mode 100644 pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java index 90b5f8d1450d..5035f6ea63c7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java @@ -76,6 +76,8 @@ protected AggregationResultsBlock getNextBlock() { QueryScanCostContext scanCost = getScanCostContext(); if (scanCost != null) { scanCost.addDocsScanned(valueBlock.getNumDocs()); + scanCost.addEntriesScannedPostFilter( + (long) valueBlock.getNumDocs() * _projectOperator.getNumColumnsProjected()); } aggregationExecutor.aggregate(valueBlock); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java index 9fe9bfd0b468..93c00d76bde7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java @@ -64,6 +64,8 @@ protected DistinctResultsBlock getNextBlock() { QueryScanCostContext scanCost = getScanCostContext(); if (scanCost != null) { scanCost.addDocsScanned(valueBlock.getNumDocs()); + scanCost.addEntriesScannedPostFilter( + (long) valueBlock.getNumDocs() * _projectOperator.getNumColumnsProjected()); } if (executor.process(valueBlock)) { break; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java index 4fe179ecc2b3..bfc56b025221 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java @@ -99,6 +99,8 @@ protected AggregationResultsBlock getNextBlock() { QueryScanCostContext scanCost = getScanCostContext(); if (scanCost != null) { scanCost.addDocsScanned(numDocsScanned); + scanCost.addEntriesScannedPostFilter( + (long) numDocsScanned * projectOperator.getNumColumnsProjected()); } _numEntriesScannedInFilter += projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); _numEntriesScannedPostFilter += (long) numDocsScanned * projectOperator.getNumColumnsProjected(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java index 4922e06a41a2..0cb56dfe5eb1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java @@ -164,6 +164,8 @@ protected GroupByResultsBlock getNextBlock() { QueryScanCostContext scanCost = getScanCostContext(); if (scanCost != null) { scanCost.addDocsScanned(numDocsScanned); + scanCost.addEntriesScannedPostFilter( + (long) numDocsScanned * projectOperator.getNumColumnsProjected()); } _numEntriesScannedInFilter += projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); _numEntriesScannedPostFilter += (long) numDocsScanned * projectOperator.getNumColumnsProjected(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java index fabbff2e9c0e..3465484b6521 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java @@ -123,6 +123,8 @@ protected GroupByResultsBlock getNextBlock() { QueryScanCostContext scanCost = getScanCostContext(); if (scanCost != null) { scanCost.addDocsScanned(valueBlock.getNumDocs()); + scanCost.addEntriesScannedPostFilter( + (long) valueBlock.getNumDocs() * _projectOperator.getNumColumnsProjected()); } groupByExecutor.process(valueBlock); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java index 64e0b0219c1a..6d7d6bac9d11 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java @@ -128,6 +128,8 @@ protected SelectionResultsBlock getNextBlock() { QueryScanCostContext scanCost = getScanCostContext(); if (scanCost != null) { scanCost.addDocsScanned(numDocsToAdd); + scanCost.addEntriesScannedPostFilter( + (long) numDocsToAdd * _projectOperator.getNumColumnsProjected()); } if (_nullHandlingEnabled) { for (int i = 0; i < numExpressions; i++) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java index e5e01d6f812d..d088d721abd7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java @@ -193,6 +193,7 @@ private SelectionResultsBlock computeAllOrdered() { QueryScanCostContext scanCost = getScanCostContext(); if (scanCost != null) { scanCost.addDocsScanned(numDocsFetched); + scanCost.addEntriesScannedPostFilter((long) numDocsFetched * numColumnsProjected); } } _numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected; @@ -261,6 +262,7 @@ private SelectionResultsBlock computePartiallyOrdered() { QueryScanCostContext scanCost2 = getScanCostContext(); if (scanCost2 != null) { scanCost2.addDocsScanned(numDocsFetched); + scanCost2.addEntriesScannedPostFilter((long) numDocsFetched * numColumnsProjected); } } _numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java index 77758887ea86..df4f624dedf6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java @@ -75,6 +75,8 @@ protected List fetch(Supplier listBuilderSupplier) { QueryScanCostContext scanCost = getScanCostContext(); if (scanCost != null) { scanCost.addDocsScanned(numDocsFetched); + scanCost.addEntriesScannedPostFilter( + (long) numDocsFetched * _projectOperator.getNumColumnsProjected()); } ListBuilder listBuilder = listBuilderSupplier.get(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java index eddb675f01ef..57237fc1ebc5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java @@ -74,6 +74,8 @@ protected List fetch(Supplier listBuilderSupplier) { QueryScanCostContext scanCost = getScanCostContext(); if (scanCost != null) { scanCost.addDocsScanned(numDocsFetched); + scanCost.addEntriesScannedPostFilter( + (long) numDocsFetched * _projectOperator.getNumColumnsProjected()); } for (int i = 0; i < numDocsFetched; i++) { if (listBuilder.add(rowFetcher.apply(i))) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java index 66fa720901b1..9c498a267164 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java @@ -126,6 +126,8 @@ protected SelectionResultsBlock getNextBlock() { QueryScanCostContext scanCost = getScanCostContext(); if (scanCost != null) { scanCost.addDocsScanned(numDocs); + scanCost.addEntriesScannedPostFilter( + (long) numDocs * _projectOperator.getNumColumnsProjected()); } return new SelectionResultsBlock(_dataSchema, rows, _queryContext); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java index 62cc45f287d5..12adba92f13b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java @@ -49,10 +49,17 @@ public class ScanEntriesThresholdStrategy implements QueryKillingStrategy { private final long _maxEntriesScannedInFilter; private final long _maxDocsScanned; + private final long _maxEntriesScannedPostFilter; public ScanEntriesThresholdStrategy(long maxEntriesScannedInFilter, long maxDocsScanned) { + this(maxEntriesScannedInFilter, maxDocsScanned, Long.MAX_VALUE); + } + + public ScanEntriesThresholdStrategy(long maxEntriesScannedInFilter, long maxDocsScanned, + long maxEntriesScannedPostFilter) { _maxEntriesScannedInFilter = maxEntriesScannedInFilter; _maxDocsScanned = maxDocsScanned; + _maxEntriesScannedPostFilter = maxEntriesScannedPostFilter; } @Override @@ -60,7 +67,9 @@ public boolean shouldTerminate(QueryScanCostContext ctx) { return (_maxEntriesScannedInFilter < Long.MAX_VALUE && ctx.getNumEntriesScannedInFilter() > _maxEntriesScannedInFilter) || (_maxDocsScanned < Long.MAX_VALUE - && ctx.getNumDocsScanned() > _maxDocsScanned); + && ctx.getNumDocsScanned() > _maxDocsScanned) + || (_maxEntriesScannedPostFilter < Long.MAX_VALUE + && ctx.getNumEntriesScannedPostFilter() > _maxEntriesScannedPostFilter); } @Override @@ -74,10 +83,15 @@ public QueryKillReport buildKillReport(QueryScanCostContext ctx, triggeringMetric = "numEntriesScannedInFilter"; actualValue = ctx.getNumEntriesScannedInFilter(); thresholdValue = _maxEntriesScannedInFilter; - } else { + } else if (_maxDocsScanned < Long.MAX_VALUE + && ctx.getNumDocsScanned() > _maxDocsScanned) { triggeringMetric = "numDocsScanned"; actualValue = ctx.getNumDocsScanned(); thresholdValue = _maxDocsScanned; + } else { + triggeringMetric = "numEntriesScannedPostFilter"; + actualValue = ctx.getNumEntriesScannedPostFilter(); + thresholdValue = _maxEntriesScannedPostFilter; } return new QueryKillReport(requestId, queryId, tableName, STRATEGY_NAME, triggeringMetric, actualValue, thresholdValue, configSource, ctx); @@ -101,12 +115,14 @@ public QueryKillingStrategy forQuery(@Nullable QueryConfig queryConfig, } Long tableEntries = queryConfig.getMaxEntriesScannedInFilter(); Long tableDocs = queryConfig.getMaxDocsScanned(); - if (tableEntries == null && tableDocs == null) { + Long tablePostFilter = queryConfig.getMaxEntriesScannedPostFilter(); + if (tableEntries == null && tableDocs == null && tablePostFilter == null) { return this; } return new ScanEntriesThresholdStrategy( tableEntries != null ? tableEntries : _maxEntriesScannedInFilter, - tableDocs != null ? tableDocs : _maxDocsScanned); + tableDocs != null ? tableDocs : _maxDocsScanned, + tablePostFilter != null ? tablePostFilter : _maxEntriesScannedPostFilter); } public long getMaxEntriesScannedInFilter() { @@ -117,6 +133,10 @@ public long getMaxDocsScanned() { return _maxDocsScanned; } + public long getMaxEntriesScannedPostFilter() { + return _maxEntriesScannedPostFilter; + } + /** * Factory that creates a {@link ScanEntriesThresholdStrategy} from * {@link QueryMonitorConfig}. This is the default factory used when no custom @@ -134,19 +154,23 @@ public static class Factory implements QueryKillingStrategyFactory { public QueryKillingStrategy create(QueryMonitorConfig config) { long maxEntries = config.getScanBasedKillingMaxEntriesScannedInFilter(); long maxDocs = config.getScanBasedKillingMaxDocsScanned(); + long maxPostFilter = config.getScanBasedKillingMaxEntriesScannedPostFilter(); - if (maxEntries == Long.MAX_VALUE && maxDocs == Long.MAX_VALUE) { + if (maxEntries == Long.MAX_VALUE && maxDocs == Long.MAX_VALUE && maxPostFilter == Long.MAX_VALUE) { LOGGER.warn("Scan-based killing is enabled but no thresholds are configured. " + "Set at least one of: accounting.scan.based.killing.max.entries.scanned.in.filter, " - + "accounting.scan.based.killing.max.docs.scanned. " + + "accounting.scan.based.killing.max.docs.scanned, " + + "accounting.scan.based.killing.max.entries.scanned.post.filter. " + "Scan-based killing will be effectively disabled until thresholds are set."); return null; } - LOGGER.info("Initialized ScanEntriesThresholdStrategy with maxEntriesScannedInFilter={}, maxDocsScanned={}", + LOGGER.info("Initialized ScanEntriesThresholdStrategy with maxEntriesScannedInFilter={}, " + + "maxDocsScanned={}, maxEntriesScannedPostFilter={}", maxEntries == Long.MAX_VALUE ? "disabled" : maxEntries, - maxDocs == Long.MAX_VALUE ? "disabled" : maxDocs); - return new ScanEntriesThresholdStrategy(maxEntries, maxDocs); + maxDocs == Long.MAX_VALUE ? "disabled" : maxDocs, + maxPostFilter == Long.MAX_VALUE ? "disabled" : maxPostFilter); + return new ScanEntriesThresholdStrategy(maxEntries, maxDocs, maxPostFilter); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java new file mode 100644 index 000000000000..cf180bf85f2a --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.plan.ProjectPlanNode; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +/** + * Verifies that query operators correctly push scan cost metrics into {@link QueryScanCostContext} + */ +public class OperatorScanCostTrackingTest { + private static final String RAW_TABLE_NAME = "scanCostTestTable"; + private static final String SEGMENT_NAME = "scanCostTestSegment"; + private static final int NUM_ROWS = 100; + private static final String COL_INT = "intCol"; + private static final String COL_STRING = "stringCol"; + private static final String COL_DOUBLE = "doubleCol"; + + private static final TableConfig TABLE_CONFIG = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + private static final Schema SCHEMA = new Schema.SchemaBuilder() + .addSingleValueDimension(COL_INT, FieldSpec.DataType.INT) + .addSingleValueDimension(COL_STRING, FieldSpec.DataType.STRING) + .addMetric(COL_DOUBLE, FieldSpec.DataType.DOUBLE) + .build(); + + private File _tempDir; + private IndexSegment _segment; + + @BeforeClass + public void setUp() + throws Exception { + _tempDir = Files.createTempDirectory("OperatorScanCostTrackingTest").toFile(); + + List records = new ArrayList<>(NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + GenericRow row = new GenericRow(); + row.putValue(COL_INT, i); + row.putValue(COL_STRING, "val_" + (i % 10)); + row.putValue(COL_DOUBLE, i * 1.5); + records.add(row); + } + + SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + config.setTableName(RAW_TABLE_NAME); + config.setSegmentName(SEGMENT_NAME); + config.setOutDir(_tempDir.getPath()); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(records)); + driver.build(); + + _segment = ImmutableSegmentLoader.load(new File(_tempDir, SEGMENT_NAME), ReadMode.mmap); + } + + @Test + public void testSelectionOnlyOperatorTracksScanCost() { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT intCol, stringCol, doubleCol FROM scanCostTestTable LIMIT 50"); + + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + QueryScanCostContext scanCost = new QueryScanCostContext(); + QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost); + + List expressions = + SelectionOperatorUtils.extractExpressions(queryContext, _segment); + BaseProjectOperator projectOperator = + new ProjectPlanNode(new SegmentContext(_segment), queryContext, expressions, + DocIdSetPlanNode.MAX_DOC_PER_CALL).run(); + int numColumnsProjected = projectOperator.getNumColumnsProjected(); + + SelectionOnlyOperator operator = + new SelectionOnlyOperator(_segment, queryContext, expressions, projectOperator); + operator.nextBlock(); + + long docsScanned = scanCost.getNumDocsScanned(); + assertTrue(docsScanned > 0, "Should have scanned some docs"); + assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned * numColumnsProjected); + } + } + + @Test + public void testSelectionOrderByOperatorTracksScanCost() { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT intCol, stringCol FROM scanCostTestTable ORDER BY intCol LIMIT 10"); + + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + QueryScanCostContext scanCost = new QueryScanCostContext(); + QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost); + + List expressions = + SelectionOperatorUtils.extractExpressions(queryContext, _segment); + BaseProjectOperator projectOperator = + new ProjectPlanNode(new SegmentContext(_segment), queryContext, expressions, + DocIdSetPlanNode.MAX_DOC_PER_CALL).run(); + int numColumnsProjected = projectOperator.getNumColumnsProjected(); + + SelectionOrderByOperator operator = + new SelectionOrderByOperator(_segment, queryContext, expressions, projectOperator); + operator.nextBlock(); + + long docsScanned = scanCost.getNumDocsScanned(); + assertTrue(docsScanned > 0, "Should have scanned some docs"); + assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned * numColumnsProjected); + } + } + + @Test + public void testDistinctOperatorTracksScanCost() { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT DISTINCT stringCol FROM scanCostTestTable"); + + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + QueryScanCostContext scanCost = new QueryScanCostContext(); + QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost); + + List expressions = queryContext.getSelectExpressions(); + BaseProjectOperator projectOperator = + new ProjectPlanNode(new SegmentContext(_segment), queryContext, expressions, + DocIdSetPlanNode.MAX_DOC_PER_CALL).run(); + int numColumnsProjected = projectOperator.getNumColumnsProjected(); + + DistinctOperator operator = new DistinctOperator(_segment, queryContext, projectOperator); + operator.nextBlock(); + + long docsScanned = scanCost.getNumDocsScanned(); + assertTrue(docsScanned > 0, "Should have scanned some docs"); + assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned * numColumnsProjected); + } + } + + @Test + public void testScanCostIsZeroWhenContextNotSet() { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT intCol, stringCol FROM scanCostTestTable LIMIT 10"); + + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + // Intentionally NOT setting QueryScanCostContext — getScanCostContext() returns null + List expressions = + SelectionOperatorUtils.extractExpressions(queryContext, _segment); + BaseProjectOperator projectOperator = + new ProjectPlanNode(new SegmentContext(_segment), queryContext, expressions, + DocIdSetPlanNode.MAX_DOC_PER_CALL).run(); + + SelectionOnlyOperator operator = + new SelectionOnlyOperator(_segment, queryContext, expressions, projectOperator); + operator.nextBlock(); + // No exception — scan cost tracking is gracefully skipped when context is null + } + } + + @AfterClass + public void tearDown() + throws IOException { + _segment.destroy(); + FileUtils.deleteDirectory(_tempDir); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java index eebef3e58811..e64c1985c909 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java @@ -122,6 +122,64 @@ public void testEitherMetricCanTrigger() { assertTrue(strategy.shouldTerminate(ctx2)); } + @Test + public void testAbovePostFilterThresholdKills() { + ScanEntriesThresholdStrategy strategy = + new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE, 1000L); + QueryScanCostContext ctx = new QueryScanCostContext(); + ctx.addEntriesScannedPostFilter(1001); + assertTrue(strategy.shouldTerminate(ctx)); + } + + @Test + public void testBelowPostFilterThresholdDoesNotKill() { + ScanEntriesThresholdStrategy strategy = + new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE, 1000L); + QueryScanCostContext ctx = new QueryScanCostContext(); + ctx.addEntriesScannedPostFilter(999); + assertFalse(strategy.shouldTerminate(ctx)); + } + + @Test + public void testBuildKillReportForPostFilter() { + ScanEntriesThresholdStrategy strategy = + new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE, 5000L); + QueryScanCostContext ctx = new QueryScanCostContext(); + ctx.addEntriesScannedPostFilter(7500); + + QueryKillReport report = strategy.buildKillReport(ctx, 3L, "q3", "myTable", "cluster"); + assertEquals(report.getTriggeringMetric(), "numEntriesScannedPostFilter"); + assertEquals(report.getActualValue(), 7500L); + assertEquals(report.getThresholdValue(), 5000L); + } + + @Test + public void testPostFilterOverrideViaForQuery() { + ScanEntriesThresholdStrategy strategy = + new ScanEntriesThresholdStrategy(100L, 200L, 300L); + QueryConfig queryConfig = new QueryConfig(null, null, null, null, null, null, null, null, 600L); + + QueryKillingStrategy result = strategy.forQuery(queryConfig, null); + assertTrue(result != strategy); + ScanEntriesThresholdStrategy overridden = (ScanEntriesThresholdStrategy) result; + assertEquals(overridden.getMaxEntriesScannedInFilter(), 100L); + assertEquals(overridden.getMaxDocsScanned(), 200L); + assertEquals(overridden.getMaxEntriesScannedPostFilter(), 600L); + } + + @Test + public void testPostFilterPriorityInBuildKillReport() { + // When both entries-in-filter and post-filter exceed, entries-in-filter takes priority + ScanEntriesThresholdStrategy strategy = + new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE, 500L); + QueryScanCostContext ctx = new QueryScanCostContext(); + ctx.addEntriesScannedInFilter(200); + ctx.addEntriesScannedPostFilter(1000); + + QueryKillReport report = strategy.buildKillReport(ctx, 4L, "q4", "myTable", "cluster"); + assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter"); + } + // --- forQuery() table override tests --- @Test From f11d3d7e7c4023aa7cff6f9ba3f6aa5c8250d8b6 Mon Sep 17 00:00:00 2001 From: Anurag Rai Date: Thu, 14 May 2026 10:28:28 +0530 Subject: [PATCH 4/6] add per-table scanKillingMode override to proactive query killing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends QueryConfig with a scanKillingMode field (disabled/logOnly/enforce) that lets individual tables override the cluster-level scan killing mode. Stores the resolved mode as a volatile field on QueryExecutionContext during query init and applies it in QueryKillingManager.checkAndKillWithStrategy() ahead of the cluster config — enabling patterns like cluster=logOnly + table=enforce for targeted enforcement without a cluster-wide mode change. Invalid mode strings are rejected at QueryConfig construction time via Preconditions. Includes 4 new manager tests and 7 new QueryConfig tests. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../executor/ServerQueryExecutorV1Impl.java | 11 ++ .../query/killing/QueryKillingManager.java | 10 +- .../killing/QueryKillingManagerTest.java | 105 ++++++++++++++++++ .../pinot/spi/config/table/QueryConfig.java | 27 ++++- .../spi/query/QueryExecutionContext.java | 21 ++++ .../table/QueryConfigScanKillingTest.java | 68 ++++++++++++ 6 files changed, 240 insertions(+), 2 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index d7a5efcad3ce..ca6c6958d5b4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -86,6 +86,7 @@ import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.trace.Tracer; import org.apache.pinot.spi.trace.Tracing; +import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode; import org.apache.pinot.spi.utils.CommonConstants.Server; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -562,6 +563,16 @@ private void initScanBasedKilling(ServerQueryRequest queryRequest, String tableN if (queryStrategy != null) { execCtx.setCachedKillingStrategy(queryStrategy); } + // Resolve and store per-table kill mode override (null = use cluster mode) + if (queryConfig != null && queryConfig.getScanKillingMode() != null) { + ScanKillingMode tableMode = ScanKillingMode.fromConfigValue(queryConfig.getScanKillingMode()); + if (tableMode != null) { + execCtx.setEffectiveScanKillingMode(tableMode); + } else { + LOGGER.warn("Invalid scanKillingMode '{}' in QueryConfig for table {}, falling back to cluster mode", + queryConfig.getScanKillingMode(), tableNameWithType); + } + } } private void addPrunerStats(InstanceResponseBlock instanceResponse, SegmentPrunerStatistics prunerStats) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java index b5ced1301e64..b305fcb3c03e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java @@ -31,6 +31,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.query.QueryExecutionContext; import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -251,10 +252,17 @@ private void checkAndKillWithStrategy(QueryExecutionContext executionContext, Qu if (!queryStrategy.shouldTerminate(scanCostContext)) { return; } + // Resolve effective mode: per-table override takes precedence over cluster config + CommonConstants.Accounting.ScanKillingMode effectiveMode = executionContext.getEffectiveScanKillingMode(); + if (effectiveMode == CommonConstants.Accounting.ScanKillingMode.DISABLED) { + return; + } + boolean logOnly = effectiveMode == CommonConstants.Accounting.ScanKillingMode.LOG_ONLY + || (effectiveMode == null && config.isScanBasedKillingLogOnly()); long requestId = executionContext.getRequestId(); QueryKillReport report = queryStrategy.buildKillReport(scanCostContext, requestId, queryId, tableName, configSource); - if (config.isScanBasedKillingLogOnly()) { + if (logOnly) { LOGGER.info("Query killed in LogOnly mode: {}", report.toInternalLogMessage()); _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1); return; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java index c2d7d4ab2651..1676edb1bf17 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java @@ -32,6 +32,7 @@ import org.apache.pinot.spi.query.QueryExecutionContext; import org.apache.pinot.spi.query.QueryScanCostContext; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -417,6 +418,110 @@ public void testResolveQueryStrategyReturnsStrategyWhenEnabled() { assertTrue(resolved instanceof ScanEntriesThresholdStrategy); } + @Test + public void testTableModeEnforceOverridesClusterLogOnly() { + // Cluster is logOnly — normally no kills + QueryMonitorConfig config = buildConfig("logOnly", 50L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // Table override: enforce + execCtx.setEffectiveScanKillingMode(ScanKillingMode.ENFORCE); + execCtx.setTableName("testTable_OFFLINE"); + execCtx.setQueryId("tbl-mode-q1"); + + QueryKillingStrategy strategy = manager.resolveQueryStrategy(null); + assertNotNull(strategy); + execCtx.setCachedKillingStrategy(strategy); + + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50 + + manager.checkAndKillIfNeeded(execCtx, scanCtx); + assertNotNull(execCtx.getTerminateException(), + "Table enforce override should cause real termination even when cluster is logOnly"); + assertEquals(execCtx.getTerminateException().getErrorCode(), QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED); + } + + @Test + public void testTableModeLogOnlyOverridesClusterEnforce() { + // Cluster is enforce — normally kills + QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // Table override: logOnly — should downgrade to dry-run + execCtx.setEffectiveScanKillingMode(ScanKillingMode.LOG_ONLY); + execCtx.setTableName("testTable_OFFLINE"); + execCtx.setQueryId("tbl-mode-q2"); + + QueryKillingStrategy strategy = manager.resolveQueryStrategy(null); + assertNotNull(strategy); + execCtx.setCachedKillingStrategy(strategy); + + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50 + + manager.checkAndKillIfNeeded(execCtx, scanCtx); + assertNull(execCtx.getTerminateException(), + "Table logOnly override should prevent real kill even when cluster is enforce"); + } + + @Test + public void testTableModeDisabledOverridesClusterEnforce() { + // Cluster is enforce — normally kills + QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // Table override: disabled — fully exempt + execCtx.setEffectiveScanKillingMode(ScanKillingMode.DISABLED); + execCtx.setTableName("testTable_OFFLINE"); + execCtx.setQueryId("tbl-mode-q3"); + + QueryKillingStrategy strategy = manager.resolveQueryStrategy(null); + assertNotNull(strategy); + execCtx.setCachedKillingStrategy(strategy); + + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50 + + manager.checkAndKillIfNeeded(execCtx, scanCtx); + assertNull(execCtx.getTerminateException(), + "Table disabled override should fully exempt the table from killing"); + } + + @Test + public void testNoTableModeOverrideFallsBackToCluster() { + // Cluster is enforce, no table mode override + QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // No setEffectiveScanKillingMode call — should use cluster mode (enforce) + execCtx.setTableName("testTable_OFFLINE"); + execCtx.setQueryId("tbl-mode-q4"); + + QueryKillingStrategy strategy = manager.resolveQueryStrategy(null); + assertNotNull(strategy); + execCtx.setCachedKillingStrategy(strategy); + + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50 + + manager.checkAndKillIfNeeded(execCtx, scanCtx); + assertNotNull(execCtx.getTerminateException(), + "Without table mode override, cluster enforce mode should kill"); + } + // --- Test fixtures for pluggable strategy --- /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java index 093b7c340fa9..92fb399f5842 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java @@ -24,6 +24,7 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.spi.config.BaseJsonConfig; +import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode; /** @@ -62,6 +63,10 @@ public class QueryConfig extends BaseJsonConfig { private final Long _maxEntriesScannedPostFilter; + // Per-table scan-based killing mode override. Null means use the cluster-level mode. + // Valid values: "disabled", "logOnly", "enforce" (case-insensitive). + private final String _scanKillingMode; + public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy, @Nullable Boolean useApproximateFunction, @Nullable Map expressionOverrideMap, @@ -70,6 +75,16 @@ public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy, maxQueryResponseSizeBytes, maxServerResponseSizeBytes, null, null, null); } + public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy, + @Nullable Boolean useApproximateFunction, @Nullable Map expressionOverrideMap, + @Nullable Long maxQueryResponseSizeBytes, @Nullable Long maxServerResponseSizeBytes, + @Nullable Long maxEntriesScannedInFilter, @Nullable Long maxDocsScanned, + @Nullable Long maxEntriesScannedPostFilter) { + this(timeoutMs, disableGroovy, useApproximateFunction, expressionOverrideMap, + maxQueryResponseSizeBytes, maxServerResponseSizeBytes, maxEntriesScannedInFilter, maxDocsScanned, + maxEntriesScannedPostFilter, null); + } + @JsonCreator public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs, @JsonProperty("disableGroovy") @Nullable Boolean disableGroovy, @@ -79,7 +94,8 @@ public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs, @JsonProperty("maxServerResponseSizeBytes") @Nullable Long maxServerResponseSizeBytes, @JsonProperty("maxEntriesScannedInFilter") @Nullable Long maxEntriesScannedInFilter, @JsonProperty("maxDocsScanned") @Nullable Long maxDocsScanned, - @JsonProperty("maxEntriesScannedPostFilter") @Nullable Long maxEntriesScannedPostFilter) { + @JsonProperty("maxEntriesScannedPostFilter") @Nullable Long maxEntriesScannedPostFilter, + @JsonProperty("scanKillingMode") @Nullable String scanKillingMode) { Preconditions.checkArgument(timeoutMs == null || timeoutMs > 0, "Invalid 'timeoutMs': %s", timeoutMs); Preconditions.checkArgument(maxQueryResponseSizeBytes == null || maxQueryResponseSizeBytes > 0, "Invalid 'maxQueryResponseSizeBytes': %s", maxQueryResponseSizeBytes); @@ -91,6 +107,8 @@ public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs, "Invalid 'maxDocsScanned': %s", maxDocsScanned); Preconditions.checkArgument(maxEntriesScannedPostFilter == null || maxEntriesScannedPostFilter > 0, "Invalid 'maxEntriesScannedPostFilter': %s", maxEntriesScannedPostFilter); + Preconditions.checkArgument(scanKillingMode == null || ScanKillingMode.fromConfigValue(scanKillingMode) != null, + "Invalid 'scanKillingMode': %s. Valid values: disabled, logOnly, enforce", scanKillingMode); _timeoutMs = timeoutMs; _disableGroovy = disableGroovy; @@ -101,6 +119,7 @@ public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs, _maxEntriesScannedInFilter = maxEntriesScannedInFilter; _maxDocsScanned = maxDocsScanned; _maxEntriesScannedPostFilter = maxEntriesScannedPostFilter; + _scanKillingMode = scanKillingMode; } @Nullable @@ -156,4 +175,10 @@ public Long getMaxDocsScanned() { public Long getMaxEntriesScannedPostFilter() { return _maxEntriesScannedPostFilter; } + + @Nullable + @JsonProperty("scanKillingMode") + public String getScanKillingMode() { + return _scanKillingMode; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java index b4e8d625d598..0ccb0da03075 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java @@ -78,6 +78,9 @@ public enum QueryType { @Nullable private volatile String _queryId; + @Nullable + private volatile Accounting.ScanKillingMode _effectiveScanKillingMode; + public QueryExecutionContext(QueryType queryType, long requestId, String cid, String workloadName, long startTimeMs, long activeDeadlineMs, long passiveDeadlineMs, String brokerId, String instanceId, String queryHash) { _queryType = queryType; @@ -242,4 +245,22 @@ public String getQueryId() { public void setQueryId(@Nullable String queryId) { _queryId = queryId; } + + /** + * Returns the per-table scan killing mode override set for this query, or {@code null} if no + * table-level override is configured. When {@code null}, the cluster-level mode from + * {@link org.apache.pinot.spi.utils.CommonConstants.Accounting} applies. + */ + @Nullable + public Accounting.ScanKillingMode getEffectiveScanKillingMode() { + return _effectiveScanKillingMode; + } + + /** + * Sets the per-table scan killing mode for this query. Pass {@code null} to fall back to the + * cluster-level mode. Called once during query initialization; thread-safe via {@code volatile}. + */ + public void setEffectiveScanKillingMode(@Nullable Accounting.ScanKillingMode effectiveScanKillingMode) { + _effectiveScanKillingMode = effectiveScanKillingMode; + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java index 5d6ea187d2dc..fce5748652c4 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.spi.config.table; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -92,4 +93,71 @@ public void testNegativeMaxEntriesScannedInFilterThrows() { public void testZeroMaxDocsScannedThrows() { new QueryConfig(null, null, null, null, null, null, null, 0L, null); } + + @Test + public void testScanKillingModeDefaultsToNull() { + QueryConfig config = new QueryConfig(null, null, null, null, null, null, null, null, null); + assertNull(config.getScanKillingMode()); + } + + @Test + public void testScanKillingModeSetExplicitly() { + QueryConfig config = new QueryConfig(null, null, null, null, null, null, null, null, null, "enforce"); + assertEquals(config.getScanKillingMode(), "enforce"); + } + + @Test + public void testScanKillingModeJsonRoundTrip() + throws Exception { + QueryConfig config = new QueryConfig(null, null, null, null, null, null, 500_000L, null, null, "logOnly"); + + String json = OBJECT_MAPPER.writeValueAsString(config); + QueryConfig deserialized = OBJECT_MAPPER.readValue(json, QueryConfig.class); + + assertEquals(deserialized.getScanKillingMode(), "logOnly"); + assertEquals(deserialized.getMaxEntriesScannedInFilter(), Long.valueOf(500_000L)); + } + + @Test + public void testScanKillingModeDeserializesFromJson() + throws Exception { + String json = "{\"maxDocsScanned\": 5000000, \"scanKillingMode\": \"disabled\"}"; + QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class); + assertEquals(config.getScanKillingMode(), "disabled"); + assertEquals(config.getMaxDocsScanned(), Long.valueOf(5_000_000L)); + } + + @Test + public void testScanKillingModeAbsentInJsonDeserializesToNull() + throws Exception { + String json = "{\"maxDocsScanned\": 5000000}"; + QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class); + assertNull(config.getScanKillingMode()); + } + + @Test + public void testScanKillingModeStringParsesToCorrectEnum() { + // Verify the production parse path: getScanKillingMode() string → ScanKillingMode enum + QueryConfig enforce = new QueryConfig(null, null, null, null, null, null, null, null, null, "enforce"); + assertEquals(ScanKillingMode.fromConfigValue(enforce.getScanKillingMode()), ScanKillingMode.ENFORCE); + + QueryConfig logOnly = new QueryConfig(null, null, null, null, null, null, null, null, null, "logOnly"); + assertEquals(ScanKillingMode.fromConfigValue(logOnly.getScanKillingMode()), ScanKillingMode.LOG_ONLY); + + QueryConfig disabled = new QueryConfig(null, null, null, null, null, null, null, null, null, "disabled"); + assertEquals(ScanKillingMode.fromConfigValue(disabled.getScanKillingMode()), ScanKillingMode.DISABLED); + } + + @Test + public void testScanKillingModeCaseInsensitiveParsing() { + // fromConfigValue is case-insensitive, so "Enforce" is valid + QueryConfig config = new QueryConfig(null, null, null, null, null, null, null, null, null, "Enforce"); + assertEquals(ScanKillingMode.fromConfigValue(config.getScanKillingMode()), ScanKillingMode.ENFORCE); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testInvalidScanKillingModeThrowsAtConstruction() { + // A completely unrecognized value fails at construction time, not silently at query time + new QueryConfig(null, null, null, null, null, null, null, null, null, "enforced"); + } } From 1f299951e611eb634a1ee90501f2256887d520df Mon Sep 17 00:00:00 2001 From: Anurag Rai Date: Thu, 14 May 2026 15:13:44 +0530 Subject: [PATCH 5/6] fix dynamic ZK config changes not being picked up by QueryKillingManager QueryKillingManager.onChange() received raw ZK keys with the full "pinot.query.scheduler." prefix but passed them directly to QueryMonitorConfig's update constructor, which checks for keys without that prefix (e.g. "accounting.scan.based.killing.mode"). The mismatch caused all dynamic config changes to be silently ignored, requiring a server restart for scan-killing config to take effect. Strip the prefix before passing to QueryMonitorConfig, matching the key space the init constructor uses. Add early return when no relevant keys changed, and log the applied config values after rebuild. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../query/killing/QueryKillingManager.java | 38 +++++++++++++++- .../killing/QueryKillingManagerTest.java | 44 +++++++++++++++---- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java index b305fcb3c03e..de526ac0d611 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.core.query.killing; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -141,12 +143,44 @@ public QueryKillingStrategy getActiveStrategy() { /** * Handles ZK cluster config changes. Rebuilds the {@link QueryMonitorConfig} from the delta * and refreshes the killing strategy if scan-killing-related keys changed. + * + *

Raw ZK keys arrive with the full {@value CommonConstants#PINOT_QUERY_SCHEDULER_PREFIX} + * prefix. We strip it before passing to {@link QueryMonitorConfig}, matching the key space + * the init constructor uses (which reads from a config already subsetted to that prefix).

*/ @Override public synchronized void onChange(Set changedConfigs, Map clusterConfigs) { - QueryMonitorConfig updated = new QueryMonitorConfig(_configRef.get(), changedConfigs, clusterConfigs); - _configRef.set(updated); + String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."; + int prefixLen = prefix.length(); + + Set filteredChangedConfigs = new HashSet<>(); + for (String key : changedConfigs) { + if (key.startsWith(prefix)) { + filteredChangedConfigs.add(key.substring(prefixLen)); + } + } + + if (filteredChangedConfigs.isEmpty()) { + return; + } + + Map filteredClusterConfigs = new HashMap<>(); + for (Map.Entry entry : clusterConfigs.entrySet()) { + if (entry.getKey().startsWith(prefix)) { + filteredClusterConfigs.put(entry.getKey().substring(prefixLen), entry.getValue()); + } + } + + QueryMonitorConfig oldConfig = _configRef.get(); + QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig, filteredChangedConfigs, filteredClusterConfigs); + _configRef.set(newConfig); rebuildStrategy(); + LOGGER.info("Scan-based killing config updated: mode={}, maxEntriesScannedInFilter={}, " + + "maxDocsScanned={}, maxEntriesScannedPostFilter={}", + newConfig.getScanBasedKillingMode(), + newConfig.getScanBasedKillingMaxEntriesScannedInFilter(), + newConfig.getScanBasedKillingMaxDocsScanned(), + newConfig.getScanBasedKillingMaxEntriesScannedPostFilter()); } /** diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java index 1676edb1bf17..782d6d1ef1c0 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java @@ -309,14 +309,18 @@ public void testOnChangeRebuildsStrategy() { manager.rebuildStrategy(); assertNull(manager.getActiveStrategy(), "Strategy should be null when disabled"); - // Simulate cluster config change enabling killing with enforce mode + threshold + // Simulate cluster config change enabling killing with enforce mode + threshold. + // Keys arrive from ZK with full "pinot.query.scheduler." prefix — onChange() strips it. + String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."; + Set changedKeys = new HashSet<>(); - changedKeys.add(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE); - changedKeys.add(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER); + changedKeys.add(prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE); + changedKeys.add(prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER); Map clusterConfigs = new HashMap<>(); - clusterConfigs.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "enforce"); - clusterConfigs.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER, "500"); + clusterConfigs.put(prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "enforce"); + clusterConfigs.put( + prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER, "500"); manager.onChange(changedKeys, clusterConfigs); assertNotNull(manager.getActiveStrategy(), @@ -332,18 +336,42 @@ public void testOnChangeDisablesStrategy() { manager.rebuildStrategy(); assertNotNull(manager.getActiveStrategy(), "Strategy should be active when enabled"); - // Simulate cluster config change to disable killing + // Simulate cluster config change to disable killing (full ZK-prefixed keys) + String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."; + Set changedKeys = new HashSet<>(); - changedKeys.add(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE); + changedKeys.add(prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE); Map clusterConfigs = new HashMap<>(); - clusterConfigs.put(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "disabled"); + clusterConfigs.put(prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "disabled"); manager.onChange(changedKeys, clusterConfigs); assertNull(manager.getActiveStrategy(), "Strategy should be null after onChange disables killing"); } + @Test + public void testOnChangeIgnoresIrrelevantKeys() { + // Start with killing enabled + QueryMonitorConfig enabledConfig = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(enabledConfig); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + assertNotNull(manager.getActiveStrategy(), "Strategy should be active when enabled"); + + // Simulate a ZK change that only touches non-scheduler keys — should be ignored + Set changedKeys = new HashSet<>(); + changedKeys.add("some.unrelated.config"); + changedKeys.add("helix.rebalance.something"); + + Map clusterConfigs = new HashMap<>(); + clusterConfigs.put("some.unrelated.config", "value"); + + manager.onChange(changedKeys, clusterConfigs); + assertNotNull(manager.getActiveStrategy(), + "Strategy should remain unchanged when no scheduler keys changed"); + } + // --- Convenience overload (2-arg checkAndKillIfNeeded) --- @Test From 87f0332d21eb06fdfb5ae6d2c3c1986f4844adfc Mon Sep 17 00:00:00 2001 From: Anurag Rai Date: Thu, 14 May 2026 19:14:50 +0530 Subject: [PATCH 6/6] switch server metrics for query killing to table specific and fall back to global when table name unavailable --- .../query/killing/QueryKillingManager.java | 20 ++- .../killing/QueryKillingManagerTest.java | 140 +++++++++++++++++- 2 files changed, 155 insertions(+), 5 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java index de526ac0d611..ea7da6ff843b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java @@ -232,7 +232,7 @@ public void checkAndKillIfNeeded(QueryExecutionContext executionContext, config); } catch (Exception e) { LOGGER.error("Error in scan-based killing evaluation for query {}", queryId, e); - _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1); + emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_ERROR, tableName); } } @@ -276,7 +276,7 @@ private void checkAndKillIfNeeded(QueryExecutionContext executionContext, QueryS resolvedTableName, config); } catch (Exception e) { LOGGER.error("Error in scan-based killing evaluation for query {}", resolvedQueryId, e); - _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1); + emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_ERROR, resolvedTableName); } } @@ -298,11 +298,23 @@ private void checkAndKillWithStrategy(QueryExecutionContext executionContext, Qu configSource); if (logOnly) { LOGGER.info("Query killed in LogOnly mode: {}", report.toInternalLogMessage()); - _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1); + emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, report.getTableName()); return; } LOGGER.warn("Query Killed in enforce mode: {}", report.toInternalLogMessage()); executionContext.terminate(queryStrategy.getErrorCode(), report.toCustomerMessage()); - _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1); + emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN, report.getTableName()); + } + + /** + * Emits a kill metric per-table when the table name is known, falling back to global emission + * when it is not. + */ + private void emitKillMetric(ServerMeter meter, @Nullable String tableName) { + if (tableName != null && !tableName.isEmpty() && !"unknown".equals(tableName)) { + _serverMetrics.addMeteredTableValue(tableName, meter, 1); + } else { + _serverMetrics.addMeteredGlobalValue(meter, 1); + } } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java index 782d6d1ef1c0..ffe44d7f94c5 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.accounting.QueryMonitorConfig; import org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy; @@ -36,7 +37,12 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -550,7 +556,139 @@ public void testNoTableModeOverrideFallsBackToCluster() { "Without table mode override, cluster enforce mode should kill"); } - // --- Test fixtures for pluggable strategy --- + // --- Per-table metric emission --- + + @Test + public void testEnforceKillEmitsPerTableMetric() { + QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(200L); + + manager.checkAndKillIfNeeded(execCtx, scanCtx, "q-metric-1", "myTable_REALTIME", null); + + verify(_serverMetrics).addMeteredTableValue("myTable_REALTIME", ServerMeter.QUERIES_KILLED_SCAN, 1L); + verify(_serverMetrics, never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1L); + } + + @Test + public void testLogOnlyKillEmitsPerTableDryRunMetric() { + QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(200L); + + manager.checkAndKillIfNeeded(execCtx, scanCtx, "q-metric-2", "myTable_REALTIME", null); + + verify(_serverMetrics).addMeteredTableValue("myTable_REALTIME", ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L); + verify(_serverMetrics, never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L); + // logOnly should not actually terminate + assertNull(execCtx.getTerminateException()); + } + + @Test + public void testNullTableNameFallsBackToGlobalErrorMetric() { + // When the strategy throws inside checkAndKillIfNeeded, the catch block emits the error + // metric. With a null table name, the helper falls back to global emission so we do not + // silently drop the error signal. + QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // Force the catch path with a cached strategy that throws inside shouldTerminate. + execCtx.setCachedKillingStrategy(new QueryKillingStrategy() { + @Override + public boolean shouldTerminate(QueryScanCostContext context) { + throw new RuntimeException("boom"); + } + + @Override + public QueryKillReport buildKillReport(QueryScanCostContext context, long requestId, + String queryId, String tableName, String configSource) { + // unused — shouldTerminate throws first + throw new UnsupportedOperationException(); + } + }); + execCtx.setTableName(null); + execCtx.setQueryId("q-metric-null"); + + manager.checkAndKillIfNeeded(execCtx, new QueryScanCostContext()); + + // Null table name → global fallback emission, never per-table + verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1L); + verify(_serverMetrics, never()).addMeteredTableValue(anyString(), + eq(ServerMeter.QUERIES_KILLED_SCAN_ERROR), anyLong()); + } + + @Test + public void testNullTableNameInReportFallsBackToGlobalEnforceMetric() { + QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + execCtx.setCachedKillingStrategy(new QueryKillingStrategy() { + @Override + public boolean shouldTerminate(QueryScanCostContext context) { + return true; + } + + @Override + public QueryKillReport buildKillReport(QueryScanCostContext context, long requestId, + String queryId, String tableName, String configSource) { + // Intentionally drop the table name to exercise the null-fallback path + return new QueryKillReport(requestId, queryId, null, "TestStrategy", "test", 0, 0, + configSource, context); + } + + @Override + public org.apache.pinot.spi.exception.QueryErrorCode getErrorCode() { + return org.apache.pinot.spi.exception.QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED; + } + }); + execCtx.setTableName(null); + execCtx.setQueryId("q-null-report"); + + manager.checkAndKillIfNeeded(execCtx, new QueryScanCostContext()); + + verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1L); + verify(_serverMetrics, never()).addMeteredTableValue(anyString(), + eq(ServerMeter.QUERIES_KILLED_SCAN), anyLong()); + } + + @Test + public void testUnknownTableNameSentinelFallsBackToGlobalMetric() { + QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(200L); + + // Use the convenience overload with null tableName → routes through "unknown" sentinel + execCtx.setTableName(null); + execCtx.setQueryId("q-unknown"); + execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null)); + + manager.checkAndKillIfNeeded(execCtx, scanCtx); + + verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1L); + verify(_serverMetrics, never()).addMeteredTableValue(eq("unknown"), + eq(ServerMeter.QUERIES_KILLED_SCAN), anyLong()); + } /** * A test strategy that always kills — used to verify custom factory loading.