Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
(String) null);
}

QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> changedConfigs, Map<String, String> clusterConfigs) {
public QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> changedConfigs,
Map<String, String> clusterConfigs) {
_maxHeapSize = oldConfig._maxHeapSize;

if (changedConfigs.contains(Accounting.Keys.MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.plan.ExplainInfo;
import org.apache.pinot.core.query.killing.QueryKillingManager;
import org.apache.pinot.spi.exception.TerminationException;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryScanCostContext;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.InvocationScope;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -55,10 +59,36 @@ public ExplainInfo getExplainInfo() {

protected void checkTermination() {
QueryThreadContext.checkTermination(this::getExplainName);
checkScanBasedKilling();
}

protected void checkTerminationAndSampleUsage() {
QueryThreadContext.checkTerminationAndSampleUsage(this::getExplainName);
checkScanBasedKilling();
}

private void checkScanBasedKilling() {
QueryKillingManager killingManager = QueryKillingManager.getInstance();
if (killingManager != null) {
QueryThreadContext ctx = QueryThreadContext.getIfAvailable();
if (ctx != null) {
QueryExecutionContext execCtx = ctx.getExecutionContext();
QueryScanCostContext scanCost = execCtx.getQueryScanCostContext();
if (scanCost != null) {
killingManager.checkAndKillIfNeeded(execCtx, scanCost);
TerminationException te = execCtx.getTerminateException();
if (te != null) {
throw te;
}
}
}
}
}

@javax.annotation.Nullable
protected static QueryScanCostContext getScanCostContext() {
QueryThreadContext ctx = QueryThreadContext.getIfAvailable();
return ctx != null ? ctx.getExecutionContext().getQueryScanCostContext() : null;
}

protected List<ExplainInfo> getChildrenExplainInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.query.QueryScanCostContext;
import org.apache.pinot.spi.query.QueryThreadContext;


Expand All @@ -49,6 +50,7 @@ public class DocIdSetOperator extends BaseDocIdSetOperator {
private BlockDocIdSet _blockDocIdSet;
private BlockDocIdIterator _blockDocIdIterator;
private int _currentDocId = 0;
private long _lastReportedEntriesScanned = 0;

public DocIdSetOperator(BaseFilterOperator filterOperator, int maxSizeOfDocIdSet) {
Preconditions.checkArgument(maxSizeOfDocIdSet > 0 && maxSizeOfDocIdSet <= DocIdSetPlanNode.MAX_DOC_PER_CALL);
Expand Down Expand Up @@ -80,6 +82,16 @@ protected DocIdSetBlock getNextBlock() {
docIds[pos++] = _currentDocId;
}
if (pos > 0) {
// Push scan cost delta for proactive query killing
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
long currentTotal = _blockDocIdSet.getNumEntriesScannedInFilter();
long delta = currentTotal - _lastReportedEntriesScanned;
if (delta > 0) {
scanCost.addEntriesScannedInFilter(delta);
_lastReportedEntriesScanned = currentTotal;
}
}
return new DocIdSetBlock(docIds, pos);
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils.AggregationInfo;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor;
import org.apache.pinot.spi.query.QueryScanCostContext;


/**
Expand Down Expand Up @@ -72,6 +73,12 @@ protected AggregationResultsBlock getNextBlock() {
ValueBlock valueBlock;
while ((valueBlock = _projectOperator.nextBlock()) != null) {
_numDocsScanned += valueBlock.getNumDocs();
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
scanCost.addDocsScanned(valueBlock.getNumDocs());
scanCost.addEntriesScannedPostFilter(
(long) valueBlock.getNumDocs() * _projectOperator.getNumColumnsProjected());
}
aggregationExecutor.aggregate(valueBlock);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.core.query.distinct.DistinctExecutorFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.query.QueryScanCostContext;


/**
Expand Down Expand Up @@ -60,6 +61,12 @@ protected DistinctResultsBlock getNextBlock() {
ValueBlock valueBlock;
while ((valueBlock = _projectOperator.nextBlock()) != null) {
_numDocsScanned += valueBlock.getNumDocs();
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
scanCost.addDocsScanned(valueBlock.getNumDocs());
scanCost.addEntriesScannedPostFilter(
(long) valueBlock.getNumDocs() * _projectOperator.getNumColumnsProjected());
}
if (executor.process(valueBlock)) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils.AggregationInfo;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeAggregationExecutor;
import org.apache.pinot.spi.query.QueryScanCostContext;


/**
Expand Down Expand Up @@ -95,6 +96,12 @@ protected AggregationResultsBlock getNextBlock() {
result[resultIndexMap.get(aggregationFunctions[i])] = filteredResult.get(i);
}
_numDocsScanned += numDocsScanned;
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
scanCost.addDocsScanned(numDocsScanned);
scanCost.addEntriesScannedPostFilter(
(long) numDocsScanned * projectOperator.getNumColumnsProjected());
}
_numEntriesScannedInFilter += projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
_numEntriesScannedPostFilter += (long) numDocsScanned * projectOperator.getNumColumnsProjected();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
import org.apache.pinot.spi.query.QueryScanCostContext;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -160,6 +161,12 @@ protected GroupByResultsBlock getNextBlock() {
}

_numDocsScanned += numDocsScanned;
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
scanCost.addDocsScanned(numDocsScanned);
scanCost.addEntriesScannedPostFilter(
(long) numDocsScanned * projectOperator.getNumColumnsProjected());
}
_numEntriesScannedInFilter += projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
_numEntriesScannedPostFilter += (long) numDocsScanned * projectOperator.getNumColumnsProjected();
GroupByResultHolder[] filterGroupByResults = groupByExecutor.getGroupByResultHolders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
import org.apache.pinot.spi.query.QueryScanCostContext;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -119,6 +120,12 @@ protected GroupByResultsBlock getNextBlock() {

while ((valueBlock = _projectOperator.nextBlock()) != null) {
_numDocsScanned += valueBlock.getNumDocs();
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
scanCost.addDocsScanned(valueBlock.getNumDocs());
scanCost.addEntriesScannedPostFilter(
(long) valueBlock.getNumDocs() * _projectOperator.getNumColumnsProjected());
}
groupByExecutor.process(valueBlock);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.query.QueryScanCostContext;
import org.roaringbitmap.RoaringBitmap;


Expand Down Expand Up @@ -124,6 +125,12 @@ protected SelectionResultsBlock getNextBlock() {
int numDocsToAdd = Math.min(_numRowsToKeep - _rows.size(), valueBlock.getNumDocs());
_rows.ensureCapacity(_rows.size() + numDocsToAdd);
_numDocsScanned += numDocsToAdd;
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
scanCost.addDocsScanned(numDocsToAdd);
scanCost.addEntriesScannedPostFilter(
(long) numDocsToAdd * _projectOperator.getNumColumnsProjected());
}
if (_nullHandlingEnabled) {
for (int i = 0; i < numExpressions; i++) {
_nullBitmaps[i] = _blockValSets[i].getNullBitmap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.query.QueryScanCostContext;
import org.roaringbitmap.RoaringBitmap;


Expand Down Expand Up @@ -189,6 +190,11 @@ private SelectionResultsBlock computeAllOrdered() {
}
}
_numDocsScanned += numDocsFetched;
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
scanCost.addDocsScanned(numDocsFetched);
scanCost.addEntriesScannedPostFilter((long) numDocsFetched * numColumnsProjected);
}
}
_numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected;

Expand Down Expand Up @@ -253,6 +259,11 @@ private SelectionResultsBlock computePartiallyOrdered() {
}
}
_numDocsScanned += numDocsFetched;
QueryScanCostContext scanCost2 = getScanCostContext();
if (scanCost2 != null) {
scanCost2.addDocsScanned(numDocsFetched);
scanCost2.addEntriesScannedPostFilter((long) numDocsFetched * numColumnsProjected);
}
}
_numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.query.QueryScanCostContext;


/**
Expand Down Expand Up @@ -71,6 +72,12 @@ protected List<Object[]> fetch(Supplier<ListBuilder> listBuilderSupplier) {
IntFunction<Object[]> rowFetcher = fetchBlock(valueBlock, blockValSets);
int numDocsFetched = valueBlock.getNumDocs();
_numDocsScanned += numDocsFetched;
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
scanCost.addDocsScanned(numDocsFetched);
scanCost.addEntriesScannedPostFilter(
(long) numDocsFetched * _projectOperator.getNumColumnsProjected());
}
ListBuilder listBuilder = listBuilderSupplier.get();

// first, calculate the best rows on this block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.query.QueryScanCostContext;


/**
Expand Down Expand Up @@ -70,6 +71,12 @@ protected List<Object[]> fetch(Supplier<ListBuilder> listBuilderSupplier) {
IntFunction<Object[]> rowFetcher = fetchBlock(valueBlock, blockValSets);
int numDocsFetched = valueBlock.getNumDocs();
_numDocsScanned += numDocsFetched;
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
scanCost.addDocsScanned(numDocsFetched);
scanCost.addEntriesScannedPostFilter(
(long) numDocsFetched * _projectOperator.getNumColumnsProjected());
}
for (int i = 0; i < numDocsFetched; i++) {
if (listBuilder.add(rowFetcher.apply(i))) {
return listBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.query.QueryScanCostContext;
import org.roaringbitmap.RoaringBitmap;


Expand Down Expand Up @@ -122,6 +123,12 @@ protected SelectionResultsBlock getNextBlock() {
}
}
_numDocsScanned += numDocs;
QueryScanCostContext scanCost = getScanCostContext();
if (scanCost != null) {
scanCost.addDocsScanned(numDocs);
scanCost.addEntriesScannedPostFilter(
(long) numDocs * _projectOperator.getNumColumnsProjected());
}
return new SelectionResultsBlock(_dataSchema, rows, _queryContext);
}

Expand Down
Loading
Loading