diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java index 920648ca5061..c5bfcde27adf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java @@ -158,7 +158,8 @@ public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) { (String) null); } - QueryMonitorConfig(QueryMonitorConfig oldConfig, Set changedConfigs, Map clusterConfigs) { + public QueryMonitorConfig(QueryMonitorConfig oldConfig, Set changedConfigs, + Map clusterConfigs) { _maxHeapSize = oldConfig._maxHeapSize; if (changedConfigs.contains(Accounting.Keys.MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java index a99855a07a7a..605e4dae91cb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java @@ -25,6 +25,10 @@ import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.plan.ExplainInfo; +import org.apache.pinot.core.query.killing.QueryKillingManager; +import org.apache.pinot.spi.exception.TerminationException; +import org.apache.pinot.spi.query.QueryExecutionContext; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.trace.InvocationScope; import org.apache.pinot.spi.trace.Tracing; @@ -55,10 +59,36 @@ public ExplainInfo getExplainInfo() { protected void checkTermination() { QueryThreadContext.checkTermination(this::getExplainName); + checkScanBasedKilling(); } protected void checkTerminationAndSampleUsage() { QueryThreadContext.checkTerminationAndSampleUsage(this::getExplainName); + checkScanBasedKilling(); + } + + private void checkScanBasedKilling() { + QueryKillingManager killingManager = QueryKillingManager.getInstance(); + if (killingManager != null) { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + if (ctx != null) { + QueryExecutionContext execCtx = ctx.getExecutionContext(); + QueryScanCostContext scanCost = execCtx.getQueryScanCostContext(); + if (scanCost != null) { + killingManager.checkAndKillIfNeeded(execCtx, scanCost); + TerminationException te = execCtx.getTerminateException(); + if (te != null) { + throw te; + } + } + } + } + } + + @javax.annotation.Nullable + protected static QueryScanCostContext getScanCostContext() { + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null; } protected List getChildrenExplainInfo() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java index f7d87453a0f5..c96936c36829 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java @@ -28,6 +28,7 @@ import org.apache.pinot.core.operator.filter.BaseFilterOperator; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.apache.pinot.spi.query.QueryThreadContext; @@ -49,6 +50,7 @@ public class DocIdSetOperator extends BaseDocIdSetOperator { private BlockDocIdSet _blockDocIdSet; private BlockDocIdIterator _blockDocIdIterator; private int _currentDocId = 0; + private long _lastReportedEntriesScanned = 0; public DocIdSetOperator(BaseFilterOperator filterOperator, int maxSizeOfDocIdSet) { Preconditions.checkArgument(maxSizeOfDocIdSet > 0 && maxSizeOfDocIdSet <= DocIdSetPlanNode.MAX_DOC_PER_CALL); @@ -80,6 +82,16 @@ protected DocIdSetBlock getNextBlock() { docIds[pos++] = _currentDocId; } if (pos > 0) { + // Push scan cost delta for proactive query killing + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + long currentTotal = _blockDocIdSet.getNumEntriesScannedInFilter(); + long delta = currentTotal - _lastReportedEntriesScanned; + if (delta > 0) { + scanCost.addEntriesScannedInFilter(delta); + _lastReportedEntriesScanned = currentTotal; + } + } return new DocIdSetBlock(docIds, pos); } else { return null; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java index 31ef246eb328..5035f6ea63c7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationOperator.java @@ -35,6 +35,7 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils.AggregationInfo; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor; +import org.apache.pinot.spi.query.QueryScanCostContext; /** @@ -72,6 +73,12 @@ protected AggregationResultsBlock getNextBlock() { ValueBlock valueBlock; while ((valueBlock = _projectOperator.nextBlock()) != null) { _numDocsScanned += valueBlock.getNumDocs(); + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(valueBlock.getNumDocs()); + scanCost.addEntriesScannedPostFilter( + (long) valueBlock.getNumDocs() * _projectOperator.getNumColumnsProjected()); + } aggregationExecutor.aggregate(valueBlock); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java index 70b51ca458dd..93c00d76bde7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java @@ -33,6 +33,7 @@ import org.apache.pinot.core.query.distinct.DistinctExecutorFactory; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.query.QueryScanCostContext; /** @@ -60,6 +61,12 @@ protected DistinctResultsBlock getNextBlock() { ValueBlock valueBlock; while ((valueBlock = _projectOperator.nextBlock()) != null) { _numDocsScanned += valueBlock.getNumDocs(); + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(valueBlock.getNumDocs()); + scanCost.addEntriesScannedPostFilter( + (long) valueBlock.getNumDocs() * _projectOperator.getNumColumnsProjected()); + } if (executor.process(valueBlock)) { break; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java index 21697367a5a8..bfc56b025221 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java @@ -35,6 +35,7 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils.AggregationInfo; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor; +import org.apache.pinot.spi.query.QueryScanCostContext; /** @@ -95,6 +96,12 @@ protected AggregationResultsBlock getNextBlock() { result[resultIndexMap.get(aggregationFunctions[i])] = filteredResult.get(i); } _numDocsScanned += numDocsScanned; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsScanned); + scanCost.addEntriesScannedPostFilter( + (long) numDocsScanned * projectOperator.getNumColumnsProjected()); + } _numEntriesScannedInFilter += projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); _numEntriesScannedPostFilter += (long) numDocsScanned * projectOperator.getNumColumnsProjected(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java index d0555b6e39c2..0cb56dfe5eb1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java @@ -48,6 +48,7 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,6 +161,12 @@ protected GroupByResultsBlock getNextBlock() { } _numDocsScanned += numDocsScanned; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsScanned); + scanCost.addEntriesScannedPostFilter( + (long) numDocsScanned * projectOperator.getNumColumnsProjected()); + } _numEntriesScannedInFilter += projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); _numEntriesScannedPostFilter += (long) numDocsScanned * projectOperator.getNumColumnsProjected(); GroupByResultHolder[] filterGroupByResults = groupByExecutor.getGroupByResultHolders(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java index bd6f58095f3d..3465484b6521 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java @@ -42,6 +42,7 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,6 +120,12 @@ protected GroupByResultsBlock getNextBlock() { while ((valueBlock = _projectOperator.nextBlock()) != null) { _numDocsScanned += valueBlock.getNumDocs(); + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(valueBlock.getNumDocs()); + scanCost.addEntriesScannedPostFilter( + (long) valueBlock.getNumDocs() * _projectOperator.getNumColumnsProjected()); + } groupByExecutor.process(valueBlock); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java index 8527f9dc0329..6d7d6bac9d11 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java @@ -38,6 +38,7 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.roaringbitmap.RoaringBitmap; @@ -124,6 +125,12 @@ protected SelectionResultsBlock getNextBlock() { int numDocsToAdd = Math.min(_numRowsToKeep - _rows.size(), valueBlock.getNumDocs()); _rows.ensureCapacity(_rows.size() + numDocsToAdd); _numDocsScanned += numDocsToAdd; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsToAdd); + scanCost.addEntriesScannedPostFilter( + (long) numDocsToAdd * _projectOperator.getNumColumnsProjected()); + } if (_nullHandlingEnabled) { for (int i = 0; i < numExpressions; i++) { _nullBitmaps[i] = _blockValSets[i].getNullBitmap(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java index 5db3e9149306..d088d721abd7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java @@ -52,6 +52,7 @@ import org.apache.pinot.core.query.utils.OrderByComparatorFactory; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.roaringbitmap.RoaringBitmap; @@ -189,6 +190,11 @@ private SelectionResultsBlock computeAllOrdered() { } } _numDocsScanned += numDocsFetched; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsFetched); + scanCost.addEntriesScannedPostFilter((long) numDocsFetched * numColumnsProjected); + } } _numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected; @@ -253,6 +259,11 @@ private SelectionResultsBlock computePartiallyOrdered() { } } _numDocsScanned += numDocsFetched; + QueryScanCostContext scanCost2 = getScanCostContext(); + if (scanCost2 != null) { + scanCost2.addDocsScanned(numDocsFetched); + scanCost2.addEntriesScannedPostFilter((long) numDocsFetched * numColumnsProjected); + } } _numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java index fb318d85be38..df4f624dedf6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java @@ -30,6 +30,7 @@ import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.query.QueryScanCostContext; /** @@ -71,6 +72,12 @@ protected List fetch(Supplier listBuilderSupplier) { IntFunction rowFetcher = fetchBlock(valueBlock, blockValSets); int numDocsFetched = valueBlock.getNumDocs(); _numDocsScanned += numDocsFetched; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsFetched); + scanCost.addEntriesScannedPostFilter( + (long) numDocsFetched * _projectOperator.getNumColumnsProjected()); + } ListBuilder listBuilder = listBuilderSupplier.get(); // first, calculate the best rows on this block diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java index eaedac0003ca..57237fc1ebc5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java @@ -28,6 +28,7 @@ import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.query.QueryScanCostContext; /** @@ -70,6 +71,12 @@ protected List fetch(Supplier listBuilderSupplier) { IntFunction rowFetcher = fetchBlock(valueBlock, blockValSets); int numDocsFetched = valueBlock.getNumDocs(); _numDocsScanned += numDocsFetched; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocsFetched); + scanCost.addEntriesScannedPostFilter( + (long) numDocsFetched * _projectOperator.getNumColumnsProjected()); + } for (int i = 0; i < numDocsFetched; i++) { if (listBuilder.add(rowFetcher.apply(i))) { return listBuilder.build(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java index bb5171c44d7c..9c498a267164 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java @@ -36,6 +36,7 @@ import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.roaringbitmap.RoaringBitmap; @@ -122,6 +123,12 @@ protected SelectionResultsBlock getNextBlock() { } } _numDocsScanned += numDocs; + QueryScanCostContext scanCost = getScanCostContext(); + if (scanCost != null) { + scanCost.addDocsScanned(numDocs); + scanCost.addEntriesScannedPostFilter( + (long) numDocs * _projectOperator.getNumColumnsProjected()); + } return new SelectionResultsBlock(_dataSchema, rows, _queryContext); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 6e1ef3c15090..ca6c6958d5b4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -59,6 +59,8 @@ import org.apache.pinot.core.plan.maker.PlanMaker; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.config.SegmentPrunerConfig; +import org.apache.pinot.core.query.killing.QueryKillingManager; +import org.apache.pinot.core.query.killing.QueryKillingStrategy; import org.apache.pinot.core.query.pruner.SegmentPrunerService; import org.apache.pinot.core.query.pruner.SegmentPrunerStatistics; import org.apache.pinot.core.query.request.ServerQueryRequest; @@ -68,17 +70,23 @@ import org.apache.pinot.core.query.utils.idset.IdSet; import org.apache.pinot.core.util.trace.TraceContext; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.spi.config.table.QueryConfig; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.QueryCancelledException; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.exception.QueryException; import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.query.QueryExecutionContext; +import org.apache.pinot.spi.query.QueryScanCostContext; import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.trace.Tracer; import org.apache.pinot.spi.trace.Tracing; +import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode; import org.apache.pinot.spi.utils.CommonConstants.Server; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -176,6 +184,9 @@ private InstanceResponseBlock executeInternal(ServerQueryRequest queryRequest, E queryContext.setEnablePrefetch(_enablePrefetch); + // Initialize scan-based query killing for this query + initScanBasedKilling(queryRequest, tableNameWithType); + // Query scheduler wait time already exceeds query timeout, directly return long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs; if (querySchedulingTimeMs >= queryTimeoutMs) { @@ -520,6 +531,50 @@ private void handleSubquery(ExpressionContext expression, TableExecutionInfo exe } } + /** + * Initializes scan-based query killing for this query. Sets up a {@link QueryScanCostContext} + * on the current thread's {@link QueryExecutionContext} so operators can push scan deltas, and + * caches the resolved per-query strategy so table-level overrides are applied only once. + */ + private void initScanBasedKilling(ServerQueryRequest queryRequest, String tableNameWithType) { + QueryKillingManager killingManager = QueryKillingManager.getInstance(); + if (killingManager == null) { + return; + } + QueryThreadContext ctx = QueryThreadContext.getIfAvailable(); + if (ctx == null) { + return; + } + QueryExecutionContext execCtx = ctx.getExecutionContext(); + execCtx.setTableName(tableNameWithType); + execCtx.setQueryId(queryRequest.getQueryId()); + execCtx.setQueryScanCostContext(new QueryScanCostContext()); + + // Resolve and cache per-query strategy (applies table-level threshold overrides) + QueryConfig queryConfig = null; + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); + if (tableDataManager != null) { + TableConfig tableConfig = tableDataManager.getCachedTableConfigAndSchema().getLeft(); + if (tableConfig != null) { + queryConfig = tableConfig.getQueryConfig(); + } + } + QueryKillingStrategy queryStrategy = killingManager.resolveQueryStrategy(queryConfig); + if (queryStrategy != null) { + execCtx.setCachedKillingStrategy(queryStrategy); + } + // Resolve and store per-table kill mode override (null = use cluster mode) + if (queryConfig != null && queryConfig.getScanKillingMode() != null) { + ScanKillingMode tableMode = ScanKillingMode.fromConfigValue(queryConfig.getScanKillingMode()); + if (tableMode != null) { + execCtx.setEffectiveScanKillingMode(tableMode); + } else { + LOGGER.warn("Invalid scanKillingMode '{}' in QueryConfig for table {}, falling back to cluster mode", + queryConfig.getScanKillingMode(), tableNameWithType); + } + } + } + private void addPrunerStats(InstanceResponseBlock instanceResponse, SegmentPrunerStatistics prunerStats) { instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), String.valueOf(prunerStats.getInvalidSegments())); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java index aa363d4d3608..fd596ae86fa2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategy.java @@ -73,10 +73,10 @@ public boolean shouldTerminate(QueryScanCostContext ctx) { @Override public QueryKillReport buildKillReport(QueryScanCostContext ctx, - String queryId, String tableName, String configSource) { + long requestId, String queryId, String tableName, String configSource) { for (QueryKillingStrategy s : _strategies) { if (s.shouldTerminate(ctx)) { - return s.buildKillReport(ctx, queryId, tableName, configSource); + return s.buildKillReport(ctx, requestId, queryId, tableName, configSource); } } throw new IllegalStateException("buildKillReport called but no strategy triggered"); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java index d44d873188dc..f98658314248 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillReport.java @@ -25,6 +25,7 @@ * Immutable snapshot of a query-kill event */ public final class QueryKillReport { + private final long _requestId; private final String _queryId; private final String _tableName; private final String _strategyName; @@ -40,6 +41,7 @@ public final class QueryKillReport { /** * Creates a {@code QueryKillReport} by snapshotting the current state of {@code context}. * + * @param requestId server request ID for tracing * @param queryId unique identifier of the killed query * @param tableName fully-qualified table name (e.g. {@code myTable_OFFLINE}) * @param strategyName name of the kill strategy that triggered the kill @@ -49,8 +51,9 @@ public final class QueryKillReport { * @param configSource source of the threshold config (e.g. {@code TABLE_CONFIG}) * @param context live scan-cost context; values are snapshotted immediately */ - public QueryKillReport(String queryId, String tableName, String strategyName, String triggeringMetric, + public QueryKillReport(long requestId, String queryId, String tableName, String strategyName, String triggeringMetric, long actualValue, long thresholdValue, String configSource, QueryScanCostContext context) { + _requestId = requestId; _queryId = queryId; _tableName = tableName; _strategyName = strategyName; @@ -67,6 +70,10 @@ public QueryKillReport(String queryId, String tableName, String strategyName, St // ----- Getters ----- + public long getRequestId() { + return _requestId; + } + public String getQueryId() { return _queryId; } @@ -121,13 +128,13 @@ public long getElapsedTimeMs() { */ public String toCustomerMessage() { return String.format( - "Query '%s' on table '%s' was killed because '%s' (%,d) exceeded the threshold (%,d) " + "Query '%s' (requestId=%d) on table '%s' was killed because '%s' (%,d) exceeded the threshold (%,d) " + "configured in %s. " + "At kill time: entriesScannedInFilter=%,d, docsScanned=%,d, " + "entriesScannedPostFilter=%,d, elapsedMs=%d. " + "To reduce scan cost, consider adding a missing index (e.g. inverted or range index) " + "on the filter columns.", - _queryId, _tableName, _triggeringMetric, _actualValue, _thresholdValue, _configSource, + _queryId, _requestId, _tableName, _triggeringMetric, _actualValue, _thresholdValue, _configSource, _snapshotEntriesScannedInFilter, _snapshotDocsScanned, _snapshotEntriesScannedPostFilter, _elapsedTimeMs); } @@ -138,10 +145,10 @@ public String toCustomerMessage() { */ public String toInternalLogMessage() { return String.format( - "QUERY_KILLED queryId=%s table=%s strategy=%s metric=%s actual=%d threshold=%d " + "QUERY_KILLED requestId=%d queryId=%s table=%s strategy=%s metric=%s actual=%d threshold=%d " + "configSource=%s entriesScannedInFilter=%d docsScanned=%d " + "entriesScannedPostFilter=%d elapsedMs=%d", - _queryId, _tableName, _strategyName, _triggeringMetric, _actualValue, _thresholdValue, + _requestId, _queryId, _tableName, _strategyName, _triggeringMetric, _actualValue, _thresholdValue, _configSource, _snapshotEntriesScannedInFilter, _snapshotDocsScanned, _snapshotEntriesScannedPostFilter, _elapsedTimeMs); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java index ee8f0678113b..ea7da6ff843b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java @@ -18,15 +18,22 @@ */ package org.apache.pinot.core.query.killing; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.accounting.QueryMonitorConfig; import org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy; +import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; import org.apache.pinot.spi.config.table.QueryConfig; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.query.QueryExecutionContext; import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,12 +42,12 @@ * Central manager for scan-based query killing. Owns the guard rails and delegates the * actual kill decision to a {@link QueryKillingStrategy}. * - * The default factory is {@link ScanEntriesThresholdStrategy.Factory}, which reads - * scan thresholds from {@link QueryMonitorConfig}. Custom factories can be configured - * via {@code accounting.scan.based.killing.strategy.factory.class.name}. + *

The strategy is built once at init via a {@link QueryKillingStrategyFactory} and rebuilt when + * cluster config changes via {@link #onChange}. The default factory is + * {@link ScanEntriesThresholdStrategy.Factory}.

* */ -public class QueryKillingManager { +public class QueryKillingManager implements PinotClusterConfigChangeListener { private static final Logger LOGGER = LoggerFactory.getLogger(QueryKillingManager.class); private static volatile QueryKillingManager _instance; @@ -64,10 +71,14 @@ public QueryKillingManager(AtomicReference configRef, Server * Initializes the singleton instance and builds the strategy from config. * Called once during server startup. */ - public static void init(AtomicReference configRef, ServerMetrics serverMetrics) { + public static QueryKillingManager init(PinotConfiguration schedulerConfig, ServerMetrics serverMetrics) { + long maxHeapSize = Runtime.getRuntime().maxMemory(); + QueryMonitorConfig config = new QueryMonitorConfig(schedulerConfig, maxHeapSize); + AtomicReference configRef = new AtomicReference<>(config); QueryKillingManager manager = new QueryKillingManager(configRef, serverMetrics); manager.rebuildStrategy(); _instance = manager; + return manager; } @Nullable @@ -129,6 +140,68 @@ public QueryKillingStrategy getActiveStrategy() { return _strategy; } + /** + * Handles ZK cluster config changes. Rebuilds the {@link QueryMonitorConfig} from the delta + * and refreshes the killing strategy if scan-killing-related keys changed. + * + *

Raw ZK keys arrive with the full {@value CommonConstants#PINOT_QUERY_SCHEDULER_PREFIX} + * prefix. We strip it before passing to {@link QueryMonitorConfig}, matching the key space + * the init constructor uses (which reads from a config already subsetted to that prefix).

+ */ + @Override + public synchronized void onChange(Set changedConfigs, Map clusterConfigs) { + String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."; + int prefixLen = prefix.length(); + + Set filteredChangedConfigs = new HashSet<>(); + for (String key : changedConfigs) { + if (key.startsWith(prefix)) { + filteredChangedConfigs.add(key.substring(prefixLen)); + } + } + + if (filteredChangedConfigs.isEmpty()) { + return; + } + + Map filteredClusterConfigs = new HashMap<>(); + for (Map.Entry entry : clusterConfigs.entrySet()) { + if (entry.getKey().startsWith(prefix)) { + filteredClusterConfigs.put(entry.getKey().substring(prefixLen), entry.getValue()); + } + } + + QueryMonitorConfig oldConfig = _configRef.get(); + QueryMonitorConfig newConfig = new QueryMonitorConfig(oldConfig, filteredChangedConfigs, filteredClusterConfigs); + _configRef.set(newConfig); + rebuildStrategy(); + LOGGER.info("Scan-based killing config updated: mode={}, maxEntriesScannedInFilter={}, " + + "maxDocsScanned={}, maxEntriesScannedPostFilter={}", + newConfig.getScanBasedKillingMode(), + newConfig.getScanBasedKillingMaxEntriesScannedInFilter(), + newConfig.getScanBasedKillingMaxDocsScanned(), + newConfig.getScanBasedKillingMaxEntriesScannedPostFilter()); + } + + /** + * Convenience overload called from {@link org.apache.pinot.core.operator.BaseOperator#checkTermination()}. + * Reads query context (table name, query id, cached strategy) from the execution context. + */ + public void checkAndKillIfNeeded(QueryExecutionContext executionContext, QueryScanCostContext scanCostContext) { + Object cached = executionContext.getCachedKillingStrategy(); + QueryKillingStrategy cachedStrategy; + if (cached instanceof QueryKillingStrategy) { + cachedStrategy = (QueryKillingStrategy) cached; + } else { + if (cached != null) { + LOGGER.warn("Unexpected cached killing strategy type: {}", cached.getClass().getName()); + } + cachedStrategy = null; + } + checkAndKillIfNeeded(executionContext, scanCostContext, cachedStrategy, + executionContext.getQueryId(), executionContext.getTableName()); + } + /** * Evaluates whether the query should be killed based on the active strategy. * @@ -138,7 +211,6 @@ public QueryKillingStrategy getActiveStrategy() { public void checkAndKillIfNeeded(QueryExecutionContext executionContext, QueryScanCostContext scanCostContext, String queryId, String tableName, @Nullable QueryConfig queryConfig) { - // no strategy means killing is disabled or unconfigured QueryKillingStrategy strategy = _strategy; if (strategy == null) { return; @@ -149,37 +221,100 @@ public void checkAndKillIfNeeded(QueryExecutionContext executionContext, return; } - // Prevent duplicate kills if (executionContext.getTerminateException() != null) { return; } try { - // Resolve per-query table overrides (returns same instance if no overrides) QueryKillingStrategy queryStrategy = strategy.forQuery(queryConfig, config); - String configSource = (queryStrategy != strategy) ? "table:" + tableName : "cluster"; + checkAndKillWithStrategy(executionContext, scanCostContext, queryStrategy, configSource, queryId, tableName, + config); + } catch (Exception e) { + LOGGER.error("Error in scan-based killing evaluation for query {}", queryId, e); + emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_ERROR, tableName); + } + } - // Delegate to strategy - if (!queryStrategy.shouldTerminate(scanCostContext)) { - return; - } + /** + * Resolves a per-query strategy (applying table-level overrides from {@code queryConfig}). + * Returns null if killing is disabled. Caches the resolved strategy on the execution context. + */ + @Nullable + public QueryKillingStrategy resolveQueryStrategy(@Nullable QueryConfig queryConfig) { + QueryKillingStrategy strategy = _strategy; + if (strategy == null) { + return null; + } + QueryMonitorConfig config = _configRef.get(); + if (config == null || !config.isScanBasedKillingEnabled()) { + return null; + } + return strategy.forQuery(queryConfig, config); + } - QueryKillReport report = queryStrategy.buildKillReport( - scanCostContext, queryId, tableName, configSource); + private void checkAndKillIfNeeded(QueryExecutionContext executionContext, QueryScanCostContext scanCostContext, + @Nullable QueryKillingStrategy cachedStrategy, @Nullable String queryId, @Nullable String tableName) { + QueryKillingStrategy currentStrategy = _strategy; + QueryKillingStrategy strategy = cachedStrategy != null ? cachedStrategy : currentStrategy; + if (strategy == null) { + return; + } + QueryMonitorConfig config = _configRef.get(); + if (config == null || !config.isScanBasedKillingEnabled()) { + return; + } + if (executionContext.getTerminateException() != null) { + return; + } + String resolvedQueryId = queryId != null ? queryId : "unknown"; + String resolvedTableName = tableName != null ? tableName : "unknown"; + String configSource = (cachedStrategy != null && cachedStrategy != currentStrategy) ? "table:" + resolvedTableName + : "cluster"; + try { + checkAndKillWithStrategy(executionContext, scanCostContext, strategy, configSource, resolvedQueryId, + resolvedTableName, config); + } catch (Exception e) { + LOGGER.error("Error in scan-based killing evaluation for query {}", resolvedQueryId, e); + emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_ERROR, resolvedTableName); + } + } - if (config.isScanBasedKillingLogOnly()) { - LOGGER.info("Query killed in LogOnly mode: {}", report.toInternalLogMessage()); - _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1); - return; - } + private void checkAndKillWithStrategy(QueryExecutionContext executionContext, QueryScanCostContext scanCostContext, + QueryKillingStrategy queryStrategy, String configSource, String queryId, String tableName, + QueryMonitorConfig config) { + if (!queryStrategy.shouldTerminate(scanCostContext)) { + return; + } + // Resolve effective mode: per-table override takes precedence over cluster config + CommonConstants.Accounting.ScanKillingMode effectiveMode = executionContext.getEffectiveScanKillingMode(); + if (effectiveMode == CommonConstants.Accounting.ScanKillingMode.DISABLED) { + return; + } + boolean logOnly = effectiveMode == CommonConstants.Accounting.ScanKillingMode.LOG_ONLY + || (effectiveMode == null && config.isScanBasedKillingLogOnly()); + long requestId = executionContext.getRequestId(); + QueryKillReport report = queryStrategy.buildKillReport(scanCostContext, requestId, queryId, tableName, + configSource); + if (logOnly) { + LOGGER.info("Query killed in LogOnly mode: {}", report.toInternalLogMessage()); + emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, report.getTableName()); + return; + } + LOGGER.warn("Query Killed in enforce mode: {}", report.toInternalLogMessage()); + executionContext.terminate(queryStrategy.getErrorCode(), report.toCustomerMessage()); + emitKillMetric(ServerMeter.QUERIES_KILLED_SCAN, report.getTableName()); + } - LOGGER.warn("Query Killed in enforce mode: {}", report.toInternalLogMessage()); - executionContext.terminate(queryStrategy.getErrorCode(), report.toCustomerMessage()); - _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1); - } catch (Exception e) { - LOGGER.error("Error in scan-based killing evaluation for query {}", queryId, e); - _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1); + /** + * Emits a kill metric per-table when the table name is known, falling back to global emission + * when it is not. + */ + private void emitKillMetric(ServerMeter meter, @Nullable String tableName) { + if (tableName != null && !tableName.isEmpty() && !"unknown".equals(tableName)) { + _serverMetrics.addMeteredTableValue(tableName, meter, 1); + } else { + _serverMetrics.addMeteredGlobalValue(meter, 1); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java index 4bf4390d59e9..c5e1248689f8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingStrategy.java @@ -47,7 +47,7 @@ public interface QueryKillingStrategy { * Only called when {@link #shouldTerminate} returns true. */ QueryKillReport buildKillReport(QueryScanCostContext context, - String queryId, String tableName, String configSource); + long requestId, String queryId, String tableName, String configSource); /** Error code for the termination response. */ default QueryErrorCode getErrorCode() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java index 92b13dcdc607..12adba92f13b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategy.java @@ -49,10 +49,17 @@ public class ScanEntriesThresholdStrategy implements QueryKillingStrategy { private final long _maxEntriesScannedInFilter; private final long _maxDocsScanned; + private final long _maxEntriesScannedPostFilter; public ScanEntriesThresholdStrategy(long maxEntriesScannedInFilter, long maxDocsScanned) { + this(maxEntriesScannedInFilter, maxDocsScanned, Long.MAX_VALUE); + } + + public ScanEntriesThresholdStrategy(long maxEntriesScannedInFilter, long maxDocsScanned, + long maxEntriesScannedPostFilter) { _maxEntriesScannedInFilter = maxEntriesScannedInFilter; _maxDocsScanned = maxDocsScanned; + _maxEntriesScannedPostFilter = maxEntriesScannedPostFilter; } @Override @@ -60,12 +67,14 @@ public boolean shouldTerminate(QueryScanCostContext ctx) { return (_maxEntriesScannedInFilter < Long.MAX_VALUE && ctx.getNumEntriesScannedInFilter() > _maxEntriesScannedInFilter) || (_maxDocsScanned < Long.MAX_VALUE - && ctx.getNumDocsScanned() > _maxDocsScanned); + && ctx.getNumDocsScanned() > _maxDocsScanned) + || (_maxEntriesScannedPostFilter < Long.MAX_VALUE + && ctx.getNumEntriesScannedPostFilter() > _maxEntriesScannedPostFilter); } @Override public QueryKillReport buildKillReport(QueryScanCostContext ctx, - String queryId, String tableName, String configSource) { + long requestId, String queryId, String tableName, String configSource) { String triggeringMetric; long actualValue; long thresholdValue; @@ -74,12 +83,17 @@ public QueryKillReport buildKillReport(QueryScanCostContext ctx, triggeringMetric = "numEntriesScannedInFilter"; actualValue = ctx.getNumEntriesScannedInFilter(); thresholdValue = _maxEntriesScannedInFilter; - } else { + } else if (_maxDocsScanned < Long.MAX_VALUE + && ctx.getNumDocsScanned() > _maxDocsScanned) { triggeringMetric = "numDocsScanned"; actualValue = ctx.getNumDocsScanned(); thresholdValue = _maxDocsScanned; + } else { + triggeringMetric = "numEntriesScannedPostFilter"; + actualValue = ctx.getNumEntriesScannedPostFilter(); + thresholdValue = _maxEntriesScannedPostFilter; } - return new QueryKillReport(queryId, tableName, STRATEGY_NAME, + return new QueryKillReport(requestId, queryId, tableName, STRATEGY_NAME, triggeringMetric, actualValue, thresholdValue, configSource, ctx); } @@ -101,12 +115,14 @@ public QueryKillingStrategy forQuery(@Nullable QueryConfig queryConfig, } Long tableEntries = queryConfig.getMaxEntriesScannedInFilter(); Long tableDocs = queryConfig.getMaxDocsScanned(); - if (tableEntries == null && tableDocs == null) { + Long tablePostFilter = queryConfig.getMaxEntriesScannedPostFilter(); + if (tableEntries == null && tableDocs == null && tablePostFilter == null) { return this; } return new ScanEntriesThresholdStrategy( tableEntries != null ? tableEntries : _maxEntriesScannedInFilter, - tableDocs != null ? tableDocs : _maxDocsScanned); + tableDocs != null ? tableDocs : _maxDocsScanned, + tablePostFilter != null ? tablePostFilter : _maxEntriesScannedPostFilter); } public long getMaxEntriesScannedInFilter() { @@ -117,6 +133,10 @@ public long getMaxDocsScanned() { return _maxDocsScanned; } + public long getMaxEntriesScannedPostFilter() { + return _maxEntriesScannedPostFilter; + } + /** * Factory that creates a {@link ScanEntriesThresholdStrategy} from * {@link QueryMonitorConfig}. This is the default factory used when no custom @@ -134,19 +154,23 @@ public static class Factory implements QueryKillingStrategyFactory { public QueryKillingStrategy create(QueryMonitorConfig config) { long maxEntries = config.getScanBasedKillingMaxEntriesScannedInFilter(); long maxDocs = config.getScanBasedKillingMaxDocsScanned(); + long maxPostFilter = config.getScanBasedKillingMaxEntriesScannedPostFilter(); - if (maxEntries == Long.MAX_VALUE && maxDocs == Long.MAX_VALUE) { + if (maxEntries == Long.MAX_VALUE && maxDocs == Long.MAX_VALUE && maxPostFilter == Long.MAX_VALUE) { LOGGER.warn("Scan-based killing is enabled but no thresholds are configured. " + "Set at least one of: accounting.scan.based.killing.max.entries.scanned.in.filter, " - + "accounting.scan.based.killing.max.docs.scanned. " + + "accounting.scan.based.killing.max.docs.scanned, " + + "accounting.scan.based.killing.max.entries.scanned.post.filter. " + "Scan-based killing will be effectively disabled until thresholds are set."); return null; } - LOGGER.info("Initialized ScanEntriesThresholdStrategy with maxEntriesScannedInFilter={}, maxDocsScanned={}", + LOGGER.info("Initialized ScanEntriesThresholdStrategy with maxEntriesScannedInFilter={}, " + + "maxDocsScanned={}, maxEntriesScannedPostFilter={}", maxEntries == Long.MAX_VALUE ? "disabled" : maxEntries, - maxDocs == Long.MAX_VALUE ? "disabled" : maxDocs); - return new ScanEntriesThresholdStrategy(maxEntries, maxDocs); + maxDocs == Long.MAX_VALUE ? "disabled" : maxDocs, + maxPostFilter == Long.MAX_VALUE ? "disabled" : maxPostFilter); + return new ScanEntriesThresholdStrategy(maxEntries, maxDocs, maxPostFilter); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java new file mode 100644 index 000000000000..cf180bf85f2a --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/OperatorScanCostTrackingTest.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.plan.ProjectPlanNode; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.query.QueryScanCostContext; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +/** + * Verifies that query operators correctly push scan cost metrics into {@link QueryScanCostContext} + */ +public class OperatorScanCostTrackingTest { + private static final String RAW_TABLE_NAME = "scanCostTestTable"; + private static final String SEGMENT_NAME = "scanCostTestSegment"; + private static final int NUM_ROWS = 100; + private static final String COL_INT = "intCol"; + private static final String COL_STRING = "stringCol"; + private static final String COL_DOUBLE = "doubleCol"; + + private static final TableConfig TABLE_CONFIG = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + private static final Schema SCHEMA = new Schema.SchemaBuilder() + .addSingleValueDimension(COL_INT, FieldSpec.DataType.INT) + .addSingleValueDimension(COL_STRING, FieldSpec.DataType.STRING) + .addMetric(COL_DOUBLE, FieldSpec.DataType.DOUBLE) + .build(); + + private File _tempDir; + private IndexSegment _segment; + + @BeforeClass + public void setUp() + throws Exception { + _tempDir = Files.createTempDirectory("OperatorScanCostTrackingTest").toFile(); + + List records = new ArrayList<>(NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + GenericRow row = new GenericRow(); + row.putValue(COL_INT, i); + row.putValue(COL_STRING, "val_" + (i % 10)); + row.putValue(COL_DOUBLE, i * 1.5); + records.add(row); + } + + SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + config.setTableName(RAW_TABLE_NAME); + config.setSegmentName(SEGMENT_NAME); + config.setOutDir(_tempDir.getPath()); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(records)); + driver.build(); + + _segment = ImmutableSegmentLoader.load(new File(_tempDir, SEGMENT_NAME), ReadMode.mmap); + } + + @Test + public void testSelectionOnlyOperatorTracksScanCost() { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT intCol, stringCol, doubleCol FROM scanCostTestTable LIMIT 50"); + + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + QueryScanCostContext scanCost = new QueryScanCostContext(); + QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost); + + List expressions = + SelectionOperatorUtils.extractExpressions(queryContext, _segment); + BaseProjectOperator projectOperator = + new ProjectPlanNode(new SegmentContext(_segment), queryContext, expressions, + DocIdSetPlanNode.MAX_DOC_PER_CALL).run(); + int numColumnsProjected = projectOperator.getNumColumnsProjected(); + + SelectionOnlyOperator operator = + new SelectionOnlyOperator(_segment, queryContext, expressions, projectOperator); + operator.nextBlock(); + + long docsScanned = scanCost.getNumDocsScanned(); + assertTrue(docsScanned > 0, "Should have scanned some docs"); + assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned * numColumnsProjected); + } + } + + @Test + public void testSelectionOrderByOperatorTracksScanCost() { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT intCol, stringCol FROM scanCostTestTable ORDER BY intCol LIMIT 10"); + + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + QueryScanCostContext scanCost = new QueryScanCostContext(); + QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost); + + List expressions = + SelectionOperatorUtils.extractExpressions(queryContext, _segment); + BaseProjectOperator projectOperator = + new ProjectPlanNode(new SegmentContext(_segment), queryContext, expressions, + DocIdSetPlanNode.MAX_DOC_PER_CALL).run(); + int numColumnsProjected = projectOperator.getNumColumnsProjected(); + + SelectionOrderByOperator operator = + new SelectionOrderByOperator(_segment, queryContext, expressions, projectOperator); + operator.nextBlock(); + + long docsScanned = scanCost.getNumDocsScanned(); + assertTrue(docsScanned > 0, "Should have scanned some docs"); + assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned * numColumnsProjected); + } + } + + @Test + public void testDistinctOperatorTracksScanCost() { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT DISTINCT stringCol FROM scanCostTestTable"); + + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + QueryScanCostContext scanCost = new QueryScanCostContext(); + QueryThreadContext.get().getExecutionContext().setQueryScanCostContext(scanCost); + + List expressions = queryContext.getSelectExpressions(); + BaseProjectOperator projectOperator = + new ProjectPlanNode(new SegmentContext(_segment), queryContext, expressions, + DocIdSetPlanNode.MAX_DOC_PER_CALL).run(); + int numColumnsProjected = projectOperator.getNumColumnsProjected(); + + DistinctOperator operator = new DistinctOperator(_segment, queryContext, projectOperator); + operator.nextBlock(); + + long docsScanned = scanCost.getNumDocsScanned(); + assertTrue(docsScanned > 0, "Should have scanned some docs"); + assertEquals(scanCost.getNumEntriesScannedPostFilter(), docsScanned * numColumnsProjected); + } + } + + @Test + public void testScanCostIsZeroWhenContextNotSet() { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT intCol, stringCol FROM scanCostTestTable LIMIT 10"); + + try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) { + // Intentionally NOT setting QueryScanCostContext — getScanCostContext() returns null + List expressions = + SelectionOperatorUtils.extractExpressions(queryContext, _segment); + BaseProjectOperator projectOperator = + new ProjectPlanNode(new SegmentContext(_segment), queryContext, expressions, + DocIdSetPlanNode.MAX_DOC_PER_CALL).run(); + + SelectionOnlyOperator operator = + new SelectionOnlyOperator(_segment, queryContext, expressions, projectOperator); + operator.nextBlock(); + // No exception — scan cost tracking is gracefully skipped when context is null + } + } + + @AfterClass + public void tearDown() + throws IOException { + _segment.destroy(); + FileUtils.deleteDirectory(_tempDir); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java index ea48ff498598..6a31a4b8d765 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/CompositeQueryKillingStrategyTest.java @@ -117,7 +117,7 @@ public void testBuildKillReportDelegatesToTriggeringStrategy() { ctx.addDocsScanned(501); assertTrue(composite.shouldTerminate(ctx)); - QueryKillReport report = composite.buildKillReport(ctx, "q1", "t1", "cluster"); + QueryKillReport report = composite.buildKillReport(ctx, 1L, "q1", "t1", "cluster"); assertEquals(report.getTriggeringMetric(), "numDocsScanned"); assertEquals(report.getActualValue(), 501L); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java index 054197aeaed0..6819a6991be7 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillReportTest.java @@ -38,6 +38,7 @@ public void testSnapshotsValuesAtCreationTime() { context.addEntriesScannedPostFilter(200L); QueryKillReport report = new QueryKillReport( + 101L, "queryId-1", "myTable_OFFLINE", "EntriesScannedInFilterStrategy", @@ -66,6 +67,7 @@ public void testCustomerMessageContainsAllFields() { context.addEntriesScannedPostFilter(50L); QueryKillReport report = new QueryKillReport( + 202L, "queryId-abc", "salesTable_REALTIME", "EntriesScannedInFilterStrategy", @@ -83,6 +85,7 @@ public void testCustomerMessageContainsAllFields() { assertTrue(msg.contains("1,234,567"), "Should contain actual value with commas"); assertTrue(msg.contains("1,000,000"), "Should contain threshold with commas"); assertTrue(msg.contains("CLUSTER_CONFIG"), "Should contain config source"); + assertTrue(msg.contains("requestId=202"), "Should contain requestId"); assertTrue(msg.contains("missing index") || msg.contains("index"), "Should include advice about missing indexes"); } @@ -95,6 +98,7 @@ public void testInternalLogMessageIsStructured() { context.addEntriesScannedPostFilter(150L); QueryKillReport report = new QueryKillReport( + 303L, "queryId-xyz", "ordersTable_OFFLINE", "DocsScannedStrategy", @@ -108,6 +112,7 @@ public void testInternalLogMessageIsStructured() { String msg = report.toInternalLogMessage(); assertTrue(msg.startsWith("QUERY_KILLED"), "Should start with QUERY_KILLED prefix"); + assertTrue(msg.contains("requestId=303"), "Should have requestId key=value"); assertTrue(msg.contains("queryId=queryId-xyz"), "Should have queryId key=value"); assertTrue(msg.contains("table=ordersTable_OFFLINE"), "Should have table key=value"); assertTrue(msg.contains("metric=numDocsScanned"), "Should have metric key=value"); @@ -124,6 +129,7 @@ public void testGetters() { context.addEntriesScannedPostFilter(25L); QueryKillReport report = new QueryKillReport( + 404L, "queryId-getters", "testTable_OFFLINE", "TestStrategy", @@ -134,6 +140,7 @@ public void testGetters() { context ); + assertEquals(report.getRequestId(), 404L); assertEquals(report.getQueryId(), "queryId-getters"); assertEquals(report.getTableName(), "testTable_OFFLINE"); assertEquals(report.getStrategyName(), "TestStrategy"); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java index 5bde697b5a36..ffe44d7f94c5 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/QueryKillingManagerTest.java @@ -19,8 +19,11 @@ package org.apache.pinot.core.query.killing; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.accounting.QueryMonitorConfig; import org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy; @@ -30,10 +33,16 @@ import org.apache.pinot.spi.query.QueryExecutionContext; import org.apache.pinot.spi.query.QueryScanCostContext; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -295,7 +304,391 @@ public void testRebuildStrategyPicksUpConfigChanges() { assertNotNull(manager.getActiveStrategy(), "After config update, strategy should be built"); } - // --- Test fixtures for pluggable strategy --- + // --- onChange (dynamic config reload) --- + + @Test + public void testOnChangeRebuildsStrategy() { + // Start with killing disabled + QueryMonitorConfig disabledConfig = buildConfig("disabled", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(disabledConfig); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + assertNull(manager.getActiveStrategy(), "Strategy should be null when disabled"); + + // Simulate cluster config change enabling killing with enforce mode + threshold. + // Keys arrive from ZK with full "pinot.query.scheduler." prefix — onChange() strips it. + String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."; + + Set changedKeys = new HashSet<>(); + changedKeys.add(prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE); + changedKeys.add(prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER); + + Map clusterConfigs = new HashMap<>(); + clusterConfigs.put(prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "enforce"); + clusterConfigs.put( + prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER, "500"); + + manager.onChange(changedKeys, clusterConfigs); + assertNotNull(manager.getActiveStrategy(), + "Strategy should be rebuilt after onChange enables killing"); + } + + @Test + public void testOnChangeDisablesStrategy() { + // Start with killing enabled + QueryMonitorConfig enabledConfig = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(enabledConfig); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + assertNotNull(manager.getActiveStrategy(), "Strategy should be active when enabled"); + + // Simulate cluster config change to disable killing (full ZK-prefixed keys) + String prefix = CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."; + + Set changedKeys = new HashSet<>(); + changedKeys.add(prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE); + + Map clusterConfigs = new HashMap<>(); + clusterConfigs.put(prefix + CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE, "disabled"); + + manager.onChange(changedKeys, clusterConfigs); + assertNull(manager.getActiveStrategy(), + "Strategy should be null after onChange disables killing"); + } + + @Test + public void testOnChangeIgnoresIrrelevantKeys() { + // Start with killing enabled + QueryMonitorConfig enabledConfig = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(enabledConfig); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + assertNotNull(manager.getActiveStrategy(), "Strategy should be active when enabled"); + + // Simulate a ZK change that only touches non-scheduler keys — should be ignored + Set changedKeys = new HashSet<>(); + changedKeys.add("some.unrelated.config"); + changedKeys.add("helix.rebalance.something"); + + Map clusterConfigs = new HashMap<>(); + clusterConfigs.put("some.unrelated.config", "value"); + + manager.onChange(changedKeys, clusterConfigs); + assertNotNull(manager.getActiveStrategy(), + "Strategy should remain unchanged when no scheduler keys changed"); + } + + // --- Convenience overload (2-arg checkAndKillIfNeeded) --- + + @Test + public void testConvenienceOverloadKillsViaCachedStrategy() { + // Create a manager with killing enabled, threshold = 50 for entries scanned + QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + // Resolve the per-query strategy and cache it on the execution context + QueryKillingStrategy resolvedStrategy = manager.resolveQueryStrategy(null); + assertNotNull(resolvedStrategy); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + execCtx.setTableName("testTable_OFFLINE"); + execCtx.setQueryId("conv-q1"); + execCtx.setCachedKillingStrategy(resolvedStrategy); + + // Create scan cost exceeding the threshold + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(100L); // Above threshold of 50 + + // Use the 2-arg convenience overload + manager.checkAndKillIfNeeded(execCtx, scanCtx); + assertNotNull(execCtx.getTerminateException(), + "Query should be terminated via cached strategy when threshold is exceeded"); + } + + @Test + public void testConvenienceOverloadNullScanCostContextNoOp() { + // When the manager's strategy is null (disabled), calling with null scanCostContext is a no-op. + // In production, BaseOperator guards against null scanCostContext before calling the manager. + // Here we verify the manager's early-return when disabled. + QueryMonitorConfig config = buildConfig("disabled", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // No queryScanCostContext set on execCtx — mirrors the real scenario where + // scan-based killing is not initialized for this query. + + // The 2-arg overload with null scanCostContext is safe when strategy is null (disabled) + manager.checkAndKillIfNeeded(execCtx, null); + assertNull(execCtx.getTerminateException(), + "No exception should be set when killing is disabled and scanCostContext is null"); + } + + // --- resolveQueryStrategy --- + + @Test + public void testResolveQueryStrategyReturnsNullWhenDisabled() { + QueryMonitorConfig config = buildConfig("disabled", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryKillingStrategy resolved = manager.resolveQueryStrategy(null); + assertNull(resolved, "resolveQueryStrategy should return null when killing is disabled"); + } + + @Test + public void testResolveQueryStrategyReturnsStrategyWhenEnabled() { + QueryMonitorConfig config = buildConfig("enforce", 200L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryKillingStrategy resolved = manager.resolveQueryStrategy(null); + assertNotNull(resolved, "resolveQueryStrategy should return a strategy when killing is enabled"); + assertTrue(resolved instanceof ScanEntriesThresholdStrategy); + } + + @Test + public void testTableModeEnforceOverridesClusterLogOnly() { + // Cluster is logOnly — normally no kills + QueryMonitorConfig config = buildConfig("logOnly", 50L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // Table override: enforce + execCtx.setEffectiveScanKillingMode(ScanKillingMode.ENFORCE); + execCtx.setTableName("testTable_OFFLINE"); + execCtx.setQueryId("tbl-mode-q1"); + + QueryKillingStrategy strategy = manager.resolveQueryStrategy(null); + assertNotNull(strategy); + execCtx.setCachedKillingStrategy(strategy); + + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50 + + manager.checkAndKillIfNeeded(execCtx, scanCtx); + assertNotNull(execCtx.getTerminateException(), + "Table enforce override should cause real termination even when cluster is logOnly"); + assertEquals(execCtx.getTerminateException().getErrorCode(), QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED); + } + + @Test + public void testTableModeLogOnlyOverridesClusterEnforce() { + // Cluster is enforce — normally kills + QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // Table override: logOnly — should downgrade to dry-run + execCtx.setEffectiveScanKillingMode(ScanKillingMode.LOG_ONLY); + execCtx.setTableName("testTable_OFFLINE"); + execCtx.setQueryId("tbl-mode-q2"); + + QueryKillingStrategy strategy = manager.resolveQueryStrategy(null); + assertNotNull(strategy); + execCtx.setCachedKillingStrategy(strategy); + + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50 + + manager.checkAndKillIfNeeded(execCtx, scanCtx); + assertNull(execCtx.getTerminateException(), + "Table logOnly override should prevent real kill even when cluster is enforce"); + } + + @Test + public void testTableModeDisabledOverridesClusterEnforce() { + // Cluster is enforce — normally kills + QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // Table override: disabled — fully exempt + execCtx.setEffectiveScanKillingMode(ScanKillingMode.DISABLED); + execCtx.setTableName("testTable_OFFLINE"); + execCtx.setQueryId("tbl-mode-q3"); + + QueryKillingStrategy strategy = manager.resolveQueryStrategy(null); + assertNotNull(strategy); + execCtx.setCachedKillingStrategy(strategy); + + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50 + + manager.checkAndKillIfNeeded(execCtx, scanCtx); + assertNull(execCtx.getTerminateException(), + "Table disabled override should fully exempt the table from killing"); + } + + @Test + public void testNoTableModeOverrideFallsBackToCluster() { + // Cluster is enforce, no table mode override + QueryMonitorConfig config = buildConfig("enforce", 50L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // No setEffectiveScanKillingMode call — should use cluster mode (enforce) + execCtx.setTableName("testTable_OFFLINE"); + execCtx.setQueryId("tbl-mode-q4"); + + QueryKillingStrategy strategy = manager.resolveQueryStrategy(null); + assertNotNull(strategy); + execCtx.setCachedKillingStrategy(strategy); + + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(100L); // exceeds cluster threshold of 50 + + manager.checkAndKillIfNeeded(execCtx, scanCtx); + assertNotNull(execCtx.getTerminateException(), + "Without table mode override, cluster enforce mode should kill"); + } + + // --- Per-table metric emission --- + + @Test + public void testEnforceKillEmitsPerTableMetric() { + QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(200L); + + manager.checkAndKillIfNeeded(execCtx, scanCtx, "q-metric-1", "myTable_REALTIME", null); + + verify(_serverMetrics).addMeteredTableValue("myTable_REALTIME", ServerMeter.QUERIES_KILLED_SCAN, 1L); + verify(_serverMetrics, never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1L); + } + + @Test + public void testLogOnlyKillEmitsPerTableDryRunMetric() { + QueryMonitorConfig config = buildConfig("logOnly", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(200L); + + manager.checkAndKillIfNeeded(execCtx, scanCtx, "q-metric-2", "myTable_REALTIME", null); + + verify(_serverMetrics).addMeteredTableValue("myTable_REALTIME", ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L); + verify(_serverMetrics, never()).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 1L); + // logOnly should not actually terminate + assertNull(execCtx.getTerminateException()); + } + + @Test + public void testNullTableNameFallsBackToGlobalErrorMetric() { + // When the strategy throws inside checkAndKillIfNeeded, the catch block emits the error + // metric. With a null table name, the helper falls back to global emission so we do not + // silently drop the error signal. + QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + // Force the catch path with a cached strategy that throws inside shouldTerminate. + execCtx.setCachedKillingStrategy(new QueryKillingStrategy() { + @Override + public boolean shouldTerminate(QueryScanCostContext context) { + throw new RuntimeException("boom"); + } + + @Override + public QueryKillReport buildKillReport(QueryScanCostContext context, long requestId, + String queryId, String tableName, String configSource) { + // unused — shouldTerminate throws first + throw new UnsupportedOperationException(); + } + }); + execCtx.setTableName(null); + execCtx.setQueryId("q-metric-null"); + + manager.checkAndKillIfNeeded(execCtx, new QueryScanCostContext()); + + // Null table name → global fallback emission, never per-table + verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_ERROR, 1L); + verify(_serverMetrics, never()).addMeteredTableValue(anyString(), + eq(ServerMeter.QUERIES_KILLED_SCAN_ERROR), anyLong()); + } + + @Test + public void testNullTableNameInReportFallsBackToGlobalEnforceMetric() { + QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + execCtx.setCachedKillingStrategy(new QueryKillingStrategy() { + @Override + public boolean shouldTerminate(QueryScanCostContext context) { + return true; + } + + @Override + public QueryKillReport buildKillReport(QueryScanCostContext context, long requestId, + String queryId, String tableName, String configSource) { + // Intentionally drop the table name to exercise the null-fallback path + return new QueryKillReport(requestId, queryId, null, "TestStrategy", "test", 0, 0, + configSource, context); + } + + @Override + public org.apache.pinot.spi.exception.QueryErrorCode getErrorCode() { + return org.apache.pinot.spi.exception.QueryErrorCode.QUERY_SCAN_LIMIT_EXCEEDED; + } + }); + execCtx.setTableName(null); + execCtx.setQueryId("q-null-report"); + + manager.checkAndKillIfNeeded(execCtx, new QueryScanCostContext()); + + verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1L); + verify(_serverMetrics, never()).addMeteredTableValue(anyString(), + eq(ServerMeter.QUERIES_KILLED_SCAN), anyLong()); + } + + @Test + public void testUnknownTableNameSentinelFallsBackToGlobalMetric() { + QueryMonitorConfig config = buildConfig("enforce", 100L, Long.MAX_VALUE); + AtomicReference configRef = new AtomicReference<>(config); + QueryKillingManager manager = new QueryKillingManager(configRef, _serverMetrics); + manager.rebuildStrategy(); + + QueryExecutionContext execCtx = QueryExecutionContext.forSseTest(); + QueryScanCostContext scanCtx = new QueryScanCostContext(); + scanCtx.addEntriesScannedInFilter(200L); + + // Use the convenience overload with null tableName → routes through "unknown" sentinel + execCtx.setTableName(null); + execCtx.setQueryId("q-unknown"); + execCtx.setCachedKillingStrategy(manager.resolveQueryStrategy(null)); + + manager.checkAndKillIfNeeded(execCtx, scanCtx); + + verify(_serverMetrics).addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1L); + verify(_serverMetrics, never()).addMeteredTableValue(eq("unknown"), + eq(ServerMeter.QUERIES_KILLED_SCAN), anyLong()); + } /** * A test strategy that always kills — used to verify custom factory loading. @@ -308,8 +701,8 @@ public boolean shouldTerminate(QueryScanCostContext context) { @Override public QueryKillReport buildKillReport(QueryScanCostContext context, - String queryId, String tableName, String configSource) { - return new QueryKillReport(queryId, tableName, "AlwaysKillStrategy", + long requestId, String queryId, String tableName, String configSource) { + return new QueryKillReport(requestId, queryId, tableName, "AlwaysKillStrategy", "always", 0, 0, configSource, context); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java index 48785498162d..e64c1985c909 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/killing/strategy/ScanEntriesThresholdStrategyTest.java @@ -81,7 +81,7 @@ public void testBuildKillReportForEntriesScanned() { QueryScanCostContext ctx = new QueryScanCostContext(); ctx.addEntriesScannedInFilter(150_000_000); - QueryKillReport report = strategy.buildKillReport(ctx, "q1", "myTable", "cluster"); + QueryKillReport report = strategy.buildKillReport(ctx, 1L, "q1", "myTable", "cluster"); assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter"); assertEquals(report.getActualValue(), 150_000_000L); assertEquals(report.getThresholdValue(), 100_000_000L); @@ -93,7 +93,7 @@ public void testBuildKillReportForDocsScanned() { QueryScanCostContext ctx = new QueryScanCostContext(); ctx.addDocsScanned(15_000_000); - QueryKillReport report = strategy.buildKillReport(ctx, "q2", "myTable", "table:myTable"); + QueryKillReport report = strategy.buildKillReport(ctx, 2L, "q2", "myTable", "table:myTable"); assertEquals(report.getTriggeringMetric(), "numDocsScanned"); assertEquals(report.getActualValue(), 15_000_000L); assertEquals(report.getThresholdValue(), 10_000_000L); @@ -122,6 +122,64 @@ public void testEitherMetricCanTrigger() { assertTrue(strategy.shouldTerminate(ctx2)); } + @Test + public void testAbovePostFilterThresholdKills() { + ScanEntriesThresholdStrategy strategy = + new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE, 1000L); + QueryScanCostContext ctx = new QueryScanCostContext(); + ctx.addEntriesScannedPostFilter(1001); + assertTrue(strategy.shouldTerminate(ctx)); + } + + @Test + public void testBelowPostFilterThresholdDoesNotKill() { + ScanEntriesThresholdStrategy strategy = + new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE, 1000L); + QueryScanCostContext ctx = new QueryScanCostContext(); + ctx.addEntriesScannedPostFilter(999); + assertFalse(strategy.shouldTerminate(ctx)); + } + + @Test + public void testBuildKillReportForPostFilter() { + ScanEntriesThresholdStrategy strategy = + new ScanEntriesThresholdStrategy(Long.MAX_VALUE, Long.MAX_VALUE, 5000L); + QueryScanCostContext ctx = new QueryScanCostContext(); + ctx.addEntriesScannedPostFilter(7500); + + QueryKillReport report = strategy.buildKillReport(ctx, 3L, "q3", "myTable", "cluster"); + assertEquals(report.getTriggeringMetric(), "numEntriesScannedPostFilter"); + assertEquals(report.getActualValue(), 7500L); + assertEquals(report.getThresholdValue(), 5000L); + } + + @Test + public void testPostFilterOverrideViaForQuery() { + ScanEntriesThresholdStrategy strategy = + new ScanEntriesThresholdStrategy(100L, 200L, 300L); + QueryConfig queryConfig = new QueryConfig(null, null, null, null, null, null, null, null, 600L); + + QueryKillingStrategy result = strategy.forQuery(queryConfig, null); + assertTrue(result != strategy); + ScanEntriesThresholdStrategy overridden = (ScanEntriesThresholdStrategy) result; + assertEquals(overridden.getMaxEntriesScannedInFilter(), 100L); + assertEquals(overridden.getMaxDocsScanned(), 200L); + assertEquals(overridden.getMaxEntriesScannedPostFilter(), 600L); + } + + @Test + public void testPostFilterPriorityInBuildKillReport() { + // When both entries-in-filter and post-filter exceed, entries-in-filter takes priority + ScanEntriesThresholdStrategy strategy = + new ScanEntriesThresholdStrategy(100L, Long.MAX_VALUE, 500L); + QueryScanCostContext ctx = new QueryScanCostContext(); + ctx.addEntriesScannedInFilter(200); + ctx.addEntriesScannedPostFilter(1000); + + QueryKillReport report = strategy.buildKillReport(ctx, 4L, "q4", "myTable", "cluster"); + assertEquals(report.getTriggeringMetric(), "numEntriesScannedInFilter"); + } + // --- forQuery() table override tests --- @Test diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 638553f05d1e..c30b8817dd85 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -85,6 +85,7 @@ import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager; import org.apache.pinot.core.data.manager.realtime.ServerRateLimitConfigChangeListener; import org.apache.pinot.core.instance.context.ServerContext; +import org.apache.pinot.core.query.killing.QueryKillingManager; import org.apache.pinot.core.query.scheduler.QuerySchedulerThreadPoolConfigChangeListener; import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.core.transport.ListenerConfig; @@ -187,6 +188,7 @@ public abstract class BaseServerStarter implements ServiceStartable { protected PinotEnvironmentProvider _pinotEnvironmentProvider; protected SegmentOperationsThrottlerSet _segmentOperationsThrottlerSet; protected ThreadAccountant _threadAccountant; + protected QueryKillingManager _queryKillingManager; protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler; protected volatile boolean _isServerReadyToServeQueries = false; protected ScheduledExecutorService _helixMessageCountScheduler; @@ -758,6 +760,10 @@ public void start() _threadAccountant = ThreadAccountantUtils.createAccountant(accountingConfig, _instanceId, org.apache.pinot.spi.config.instance.InstanceType.SERVER); + // Initialize scan-based query killing + PinotConfiguration schedulerConfig = _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX); + _queryKillingManager = QueryKillingManager.init(schedulerConfig, ServerMetrics.get()); + SendStatsPredicate sendStatsPredicate = SendStatsPredicate.create(_serverConf, _helixManager); KeepPipelineBreakerStatsPredicate keepPipelineBreakerStatsPredicate = KeepPipelineBreakerStatsPredicate.create(_serverConf); @@ -775,6 +781,11 @@ public void start() new ServerRateLimitConfigChangeListener(_serverMetrics); _clusterConfigChangeHandler.registerClusterConfigChangeListener(serverRateLimitConfigChangeListener); + // Register query killing manager for dynamic config updates (threshold/mode changes via ZK) + if (_queryKillingManager != null) { + _clusterConfigChangeHandler.registerClusterConfigChangeListener(_queryKillingManager); + } + initSegmentFetcher(_serverConf); StateModelFactory stateModelFactory = createSegmentOnlineOfflineStateModelFactory(instanceDataManager, _transitionThreadPoolManager); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java index 093b7c340fa9..92fb399f5842 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java @@ -24,6 +24,7 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.spi.config.BaseJsonConfig; +import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode; /** @@ -62,6 +63,10 @@ public class QueryConfig extends BaseJsonConfig { private final Long _maxEntriesScannedPostFilter; + // Per-table scan-based killing mode override. Null means use the cluster-level mode. + // Valid values: "disabled", "logOnly", "enforce" (case-insensitive). + private final String _scanKillingMode; + public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy, @Nullable Boolean useApproximateFunction, @Nullable Map expressionOverrideMap, @@ -70,6 +75,16 @@ public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy, maxQueryResponseSizeBytes, maxServerResponseSizeBytes, null, null, null); } + public QueryConfig(@Nullable Long timeoutMs, @Nullable Boolean disableGroovy, + @Nullable Boolean useApproximateFunction, @Nullable Map expressionOverrideMap, + @Nullable Long maxQueryResponseSizeBytes, @Nullable Long maxServerResponseSizeBytes, + @Nullable Long maxEntriesScannedInFilter, @Nullable Long maxDocsScanned, + @Nullable Long maxEntriesScannedPostFilter) { + this(timeoutMs, disableGroovy, useApproximateFunction, expressionOverrideMap, + maxQueryResponseSizeBytes, maxServerResponseSizeBytes, maxEntriesScannedInFilter, maxDocsScanned, + maxEntriesScannedPostFilter, null); + } + @JsonCreator public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs, @JsonProperty("disableGroovy") @Nullable Boolean disableGroovy, @@ -79,7 +94,8 @@ public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs, @JsonProperty("maxServerResponseSizeBytes") @Nullable Long maxServerResponseSizeBytes, @JsonProperty("maxEntriesScannedInFilter") @Nullable Long maxEntriesScannedInFilter, @JsonProperty("maxDocsScanned") @Nullable Long maxDocsScanned, - @JsonProperty("maxEntriesScannedPostFilter") @Nullable Long maxEntriesScannedPostFilter) { + @JsonProperty("maxEntriesScannedPostFilter") @Nullable Long maxEntriesScannedPostFilter, + @JsonProperty("scanKillingMode") @Nullable String scanKillingMode) { Preconditions.checkArgument(timeoutMs == null || timeoutMs > 0, "Invalid 'timeoutMs': %s", timeoutMs); Preconditions.checkArgument(maxQueryResponseSizeBytes == null || maxQueryResponseSizeBytes > 0, "Invalid 'maxQueryResponseSizeBytes': %s", maxQueryResponseSizeBytes); @@ -91,6 +107,8 @@ public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs, "Invalid 'maxDocsScanned': %s", maxDocsScanned); Preconditions.checkArgument(maxEntriesScannedPostFilter == null || maxEntriesScannedPostFilter > 0, "Invalid 'maxEntriesScannedPostFilter': %s", maxEntriesScannedPostFilter); + Preconditions.checkArgument(scanKillingMode == null || ScanKillingMode.fromConfigValue(scanKillingMode) != null, + "Invalid 'scanKillingMode': %s. Valid values: disabled, logOnly, enforce", scanKillingMode); _timeoutMs = timeoutMs; _disableGroovy = disableGroovy; @@ -101,6 +119,7 @@ public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs, _maxEntriesScannedInFilter = maxEntriesScannedInFilter; _maxDocsScanned = maxDocsScanned; _maxEntriesScannedPostFilter = maxEntriesScannedPostFilter; + _scanKillingMode = scanKillingMode; } @Nullable @@ -156,4 +175,10 @@ public Long getMaxDocsScanned() { public Long getMaxEntriesScannedPostFilter() { return _maxEntriesScannedPostFilter; } + + @Nullable + @JsonProperty("scanKillingMode") + public String getScanKillingMode() { + return _scanKillingMode; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java index ca970cf5ff19..0ccb0da03075 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java @@ -65,6 +65,22 @@ public enum QueryType { private volatile TerminationException _terminateException; + /// Per-query scan cost accumulators for scan-based killing, tracking cumulative scan cost across all segments. + @Nullable + private volatile QueryScanCostContext _queryScanCostContext; + + @Nullable + private volatile Object _cachedKillingStrategy; + + @Nullable + private volatile String _tableName; + + @Nullable + private volatile String _queryId; + + @Nullable + private volatile Accounting.ScanKillingMode _effectiveScanKillingMode; + public QueryExecutionContext(QueryType queryType, long requestId, String cid, String workloadName, long startTimeMs, long activeDeadlineMs, long passiveDeadlineMs, String brokerId, String instanceId, String queryHash) { _queryType = queryType; @@ -193,4 +209,58 @@ public synchronized boolean terminate(QueryErrorCode errorCode, String message) public TerminationException getTerminateException() { return _terminateException; } + + @Nullable + public QueryScanCostContext getQueryScanCostContext() { + return _queryScanCostContext; + } + + public void setQueryScanCostContext(@Nullable QueryScanCostContext queryScanCostContext) { + _queryScanCostContext = queryScanCostContext; + } + + @Nullable + public Object getCachedKillingStrategy() { + return _cachedKillingStrategy; + } + + public void setCachedKillingStrategy(@Nullable Object cachedKillingStrategy) { + _cachedKillingStrategy = cachedKillingStrategy; + } + + @Nullable + public String getTableName() { + return _tableName; + } + + public void setTableName(@Nullable String tableName) { + _tableName = tableName; + } + + @Nullable + public String getQueryId() { + return _queryId; + } + + public void setQueryId(@Nullable String queryId) { + _queryId = queryId; + } + + /** + * Returns the per-table scan killing mode override set for this query, or {@code null} if no + * table-level override is configured. When {@code null}, the cluster-level mode from + * {@link org.apache.pinot.spi.utils.CommonConstants.Accounting} applies. + */ + @Nullable + public Accounting.ScanKillingMode getEffectiveScanKillingMode() { + return _effectiveScanKillingMode; + } + + /** + * Sets the per-table scan killing mode for this query. Pass {@code null} to fall back to the + * cluster-level mode. Called once during query initialization; thread-safe via {@code volatile}. + */ + public void setEffectiveScanKillingMode(@Nullable Accounting.ScanKillingMode effectiveScanKillingMode) { + _effectiveScanKillingMode = effectiveScanKillingMode; + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java index 5d6ea187d2dc..fce5748652c4 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/QueryConfigScanKillingTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.spi.config.table; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.pinot.spi.utils.CommonConstants.Accounting.ScanKillingMode; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -92,4 +93,71 @@ public void testNegativeMaxEntriesScannedInFilterThrows() { public void testZeroMaxDocsScannedThrows() { new QueryConfig(null, null, null, null, null, null, null, 0L, null); } + + @Test + public void testScanKillingModeDefaultsToNull() { + QueryConfig config = new QueryConfig(null, null, null, null, null, null, null, null, null); + assertNull(config.getScanKillingMode()); + } + + @Test + public void testScanKillingModeSetExplicitly() { + QueryConfig config = new QueryConfig(null, null, null, null, null, null, null, null, null, "enforce"); + assertEquals(config.getScanKillingMode(), "enforce"); + } + + @Test + public void testScanKillingModeJsonRoundTrip() + throws Exception { + QueryConfig config = new QueryConfig(null, null, null, null, null, null, 500_000L, null, null, "logOnly"); + + String json = OBJECT_MAPPER.writeValueAsString(config); + QueryConfig deserialized = OBJECT_MAPPER.readValue(json, QueryConfig.class); + + assertEquals(deserialized.getScanKillingMode(), "logOnly"); + assertEquals(deserialized.getMaxEntriesScannedInFilter(), Long.valueOf(500_000L)); + } + + @Test + public void testScanKillingModeDeserializesFromJson() + throws Exception { + String json = "{\"maxDocsScanned\": 5000000, \"scanKillingMode\": \"disabled\"}"; + QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class); + assertEquals(config.getScanKillingMode(), "disabled"); + assertEquals(config.getMaxDocsScanned(), Long.valueOf(5_000_000L)); + } + + @Test + public void testScanKillingModeAbsentInJsonDeserializesToNull() + throws Exception { + String json = "{\"maxDocsScanned\": 5000000}"; + QueryConfig config = OBJECT_MAPPER.readValue(json, QueryConfig.class); + assertNull(config.getScanKillingMode()); + } + + @Test + public void testScanKillingModeStringParsesToCorrectEnum() { + // Verify the production parse path: getScanKillingMode() string → ScanKillingMode enum + QueryConfig enforce = new QueryConfig(null, null, null, null, null, null, null, null, null, "enforce"); + assertEquals(ScanKillingMode.fromConfigValue(enforce.getScanKillingMode()), ScanKillingMode.ENFORCE); + + QueryConfig logOnly = new QueryConfig(null, null, null, null, null, null, null, null, null, "logOnly"); + assertEquals(ScanKillingMode.fromConfigValue(logOnly.getScanKillingMode()), ScanKillingMode.LOG_ONLY); + + QueryConfig disabled = new QueryConfig(null, null, null, null, null, null, null, null, null, "disabled"); + assertEquals(ScanKillingMode.fromConfigValue(disabled.getScanKillingMode()), ScanKillingMode.DISABLED); + } + + @Test + public void testScanKillingModeCaseInsensitiveParsing() { + // fromConfigValue is case-insensitive, so "Enforce" is valid + QueryConfig config = new QueryConfig(null, null, null, null, null, null, null, null, null, "Enforce"); + assertEquals(ScanKillingMode.fromConfigValue(config.getScanKillingMode()), ScanKillingMode.ENFORCE); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testInvalidScanKillingModeThrowsAtConstruction() { + // A completely unrecognized value fails at construction time, not silently at query time + new QueryConfig(null, null, null, null, null, null, null, null, null, "enforced"); + } }