Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3c1674d
[multistage] Add SubmitWithStream RPC + MultiStageStatsTree proto def…
gortiz Apr 30, 2026
99eef19
[multistage] Capture op->PlanNode mapping on OpChainExecutionContext
gortiz May 4, 2026
49ba40e
[multistage] Add MultiStageStatsTreeEncoder for stream-mode stats rep…
gortiz May 4, 2026
0f6ab03
[multistage] Add MultiStageStatsTree decoder + tree-shape merge
gortiz May 4, 2026
8bea7ef
[multistage] Add SubmitWithStream handler skeleton on QueryServer
gortiz May 5, 2026
ec56450
[multistage] Wire opchain completion listener into SubmitWithStream
gortiz May 5, 2026
923cd24
[multistage] Suppress mailbox stats when SubmitWithStream is in use
gortiz May 5, 2026
63a76b8
[multistage] Broker-side streaming session + completion semantics
gortiz May 5, 2026
065e0e7
[multistage] DispatchClient.submitWithStream + StreamingDispatchObserver
gortiz May 5, 2026
6475f74
[multistage] StreamingDispatchObserver tests
gortiz May 5, 2026
f1692d0
[multistage] Wire SubmitWithStream into broker QueryDispatcher
gortiz May 6, 2026
87f00a8
[multistage] Document mixed-version policy for stream-mode dispatch
gortiz May 6, 2026
a26def6
[multistage] Add cluster-level config for stream-mode stats reporting
gortiz May 6, 2026
0284485
[multistage] Expose per-stage stats coverage on QueryResult and broke…
gortiz May 6, 2026
3a092be
[multistage] Add StreamStatsReportingIntegrationTest + fix config nam…
gortiz May 6, 2026
3c3dffc
[multistage] Phase B: cancel-via-stream + O(1) cancel in OpChainSched…
gortiz May 11, 2026
09f56da
[multistage] Fix 3 checkstyle violations caught by GHA linter
gortiz May 11, 2026
08295d9
[multistage] OperatorTypeDescriptor SPI + OperatorTypeRegistry
gortiz May 12, 2026
a6acb08
Fix checkstyle: wrap long assertion message line in OperatorTypeRegis…
gortiz May 12, 2026
5b329b5
Add merge-resilience tests; rename stream stats option to streamStats
gortiz May 12, 2026
da81c97
[query-planner] PlannerContext implements Context; expose per-query o…
gortiz May 12, 2026
4313223
Merge branch 'master' into feature/mse-push-stats
gortiz May 13, 2026
adf2bb0
Fix StreamingDispatchObserver DONE handler to drain latch when server…
gortiz May 13, 2026
39e5065
Add edge-case tests for StreamingDispatchObserver DONE/onError intera…
gortiz May 13, 2026
d43a203
Address review feedback: decode outside lock, 50ms error-path cap, JM…
gortiz May 14, 2026
39d12cb
Address review: fix onError race, add stream-stats metrics, improve s…
gortiz May 14, 2026
886ab99
Improve BenchmarkStreamStatsMSQE: 4 threads, 2 servers, triple-join, …
gortiz May 14, 2026
237e252
Address review: depth cap in decoder, gate plan-node maps on isSendSt…
gortiz May 20, 2026
9d1dc55
Fix liveness gap: increment opchain counter in finally, always call o…
gortiz May 20, 2026
21cb248
Move stream-stats routing default into QueryDispatcher; remove per-qu…
gortiz May 20, 2026
e1c6f01
Fix three correctness bugs in SubmitWithStream stats path
gortiz May 21, 2026
af675a8
Address review: doc-only fixes for stream-stats push feature (fixes #…
gortiz May 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Duration;
Expand Down Expand Up @@ -139,6 +140,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final Set<String> _defaultDisabledPlannerRules;
protected final long _extraPassiveTimeoutMs;
protected final boolean _enableQueryFingerprinting;
private final boolean _streamStatsDefault;

protected final PinotMeter _stagesStartedMeter = BrokerMeter.MSE_STAGES_STARTED.getGlobalMeter();
protected final PinotMeter _stagesFinishedMeter = BrokerMeter.MSE_STAGES_COMPLETED.getGlobalMeter();
Expand Down Expand Up @@ -181,10 +183,13 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
boolean dispatchKeepAliveWithoutCalls = config.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_DISPATCH_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS,
CommonConstants.MultiStageQueryRunner.DEFAULT_OF_DISPATCH_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS);
_streamStatsDefault = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_STREAM_STATS,
CommonConstants.Broker.DEFAULT_STREAM_STATS);
_queryDispatcher =
new QueryDispatcher(new MailboxService(hostname, port, InstanceType.BROKER, config, tlsConfig), failureDetector,
tlsConfig, isQueryCancellationEnabled(), cancelTimeout, dispatchKeepAliveTimeMs,
dispatchKeepAliveTimeoutMs, dispatchKeepAliveWithoutCalls);
dispatchKeepAliveTimeoutMs, dispatchKeepAliveWithoutCalls, _streamStatsDefault);
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, "
+ "query log max length: {}, query log max rate: {}, query cancellation enabled: {}", hostname, port,
_brokerId, _brokerTimeoutMs, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
Expand Down Expand Up @@ -728,7 +733,18 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
}
}

fillOldBrokerResponseStats(brokerResponse, queryResults.getQueryStats(), dispatchableSubPlan);
fillOldBrokerResponseStats(brokerResponse, queryResults.getQueryStats(), dispatchableSubPlan,
queryResults.getStageCoverage());

if (QueryOptionsUtils.isStreamStats(query.getOptions(), _streamStatsDefault)) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MSE_STREAM_STATS_QUERIES, 1);
List<QueryDispatcher.QueryResult.StageCoverage> coverage = queryResults.getStageCoverage();
if (coverage != null && coverage.stream()
.anyMatch(c -> c != null && (c.getMissing() > 0 || c.getMergeFailed() > 0))) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MSE_STREAM_STATS_INCOMPLETE_COVERAGE, 1);
}
}

long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis();
_brokerMetrics.addTimedValue(BrokerTimer.MULTI_STAGE_QUERY_TOTAL_TIME_MS, totalTimeMs, TimeUnit.MILLISECONDS);

Expand Down Expand Up @@ -818,7 +834,8 @@ private Collection<PlanNode> requestPhysicalPlan(DispatchablePlanFragment fragme
}

private void fillOldBrokerResponseStats(BrokerResponseNativeV2 brokerResponse,
List<MultiStageQueryStats.StageStats.Closed> queryStats, DispatchableSubPlan dispatchableSubPlan) {
List<MultiStageQueryStats.StageStats.Closed> queryStats, DispatchableSubPlan dispatchableSubPlan,
@Nullable List<QueryDispatcher.QueryResult.StageCoverage> stageCoverage) {
try {
Map<Integer, DispatchablePlanFragment> queryStageMap = dispatchableSubPlan.getQueryStageMap();

Expand All @@ -844,6 +861,20 @@ private void fillOldBrokerResponseStats(BrokerResponseNativeV2 brokerResponse,
brokerResponse.setStageStats(JsonNodeFactory.instance.objectNode()
.put("error", "Error encountered while collecting multi-stage stats - " + e));
}
if (stageCoverage != null) {
ArrayNode coverageArray = JsonNodeFactory.instance.arrayNode();
for (QueryDispatcher.QueryResult.StageCoverage sc : stageCoverage) {
if (sc == null) {
coverageArray.addNull();
} else {
coverageArray.addObject()
.put("responded", sc.getResponded())
.put("mergeFailed", sc.getMergeFailed())
.put("missing", sc.getMissing());
}
}
brokerResponse.setStreamStatsCoverage(coverageArray);
}
}

private BrokerResponse constructMultistageExplainPlan(String sql, String plan, Map<String, String> extraFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,18 @@ public class BrokerMeter implements AbstractMetrics.Meter {
public static final BrokerMeter MSE_OPCHAINS_STARTED = create("MSE_OPCHAINS_STARTED", "opchains", true);
public static final BrokerMeter MSE_OPCHAINS_COMPLETED = create("MSE_OPCHAINS_COMPLETED", "opchains", true);

/**
* Number of MSE queries that used the {@code SubmitWithStream} bidi-RPC stats path (stream mode).
*/
public static final BrokerMeter MSE_STREAM_STATS_QUERIES = create("MSE_STREAM_STATS_QUERIES", "queries", true);

/**
* Number of MSE stream-mode queries that returned with incomplete stats coverage (at least one stage had missing or
* merge-failed opchain reports). Operators can alert on this counter to detect persistent stats gaps.
*/
public static final BrokerMeter MSE_STREAM_STATS_INCOMPLETE_COVERAGE =
create("MSE_STREAM_STATS_INCOMPLETE_COVERAGE", "queries", true);

/**
* How many MSE queries have encountered segments with invalid partitions.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -39,7 +40,7 @@
@JsonPropertyOrder({
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
"numGroupsWarningLimitReached", "numGroups", "maxRowsInJoinReached", "maxRowsInJoin",
"maxRowsInWindowReached", "maxRowsInWindow", "timeUsedMs", "stageStats",
"maxRowsInWindowReached", "maxRowsInWindow", "timeUsedMs", "stageStats", "streamStatsCoverage",
"maxRowsInOperator", "requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
Expand Down Expand Up @@ -68,6 +69,13 @@ public class BrokerResponseNativeV2 implements BrokerResponse {
* Statistics for each stage of the query execution.
*/
private ObjectNode _stageStats;
/**
* Stream-mode stats coverage, populated only when the query used {@code SubmitWithStream}. An array indexed by stage
* id; each element is an object with {@code responded}, {@code mergeFailed}, and {@code missing} counters, or
* {@code null} for stages that have no coverage info (e.g. stage 0 which runs broker-local).
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
private ArrayNode _streamStatsCoverage;
/**
* The max number of rows seen at runtime.
* <p>
Expand Down Expand Up @@ -208,6 +216,19 @@ public void setStageStats(ObjectNode stageStats) {
_stageStats = stageStats;
}

/**
* Returns the stream-mode stats coverage, or {@code null} when the query ran in legacy mode. Array indexed by stage
* id; elements may be {@code null} for stages with no coverage (e.g. stage 0).
*/
@Nullable
public ArrayNode getStreamStatsCoverage() {
return _streamStatsCoverage;
}

public void setStreamStatsCoverage(ArrayNode streamStatsCoverage) {
_streamStatsCoverage = streamStatsCoverage;
}

/**
* Returns the maximum number of rows seen by a single operator in the query processing chain.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,15 @@ public static boolean isUsePhysicalOptimizer(Map<String, String> queryOptions, b
return option != null ? Boolean.parseBoolean(option) : defaultValue;
}

/**
* Reads the {@code streamStats} query option that opts a single query into the {@code SubmitWithStream}
* dispatch path. See {@link QueryOptionKey#STREAM_STATS}.
*/
public static boolean isStreamStats(Map<String, String> queryOptions, boolean defaultValue) {
String option = queryOptions.get(QueryOptionKey.STREAM_STATS);
return option != null ? Boolean.parseBoolean(option) : defaultValue;
}

public static boolean isMultiClusterRoutingEnabled(Map<String, String> queryOptions, boolean defaultValue) {
String option = queryOptions.get(QueryOptionKey.ENABLE_MULTI_CLUSTER_ROUTING);
return option != null ? Boolean.parseBoolean(option) : defaultValue;
Expand Down
89 changes: 88 additions & 1 deletion pinot-common/src/main/proto/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ service PinotQueryWorker {
rpc Cancel(CancelRequest) returns (CancelResponse);

rpc Explain(QueryRequest) returns (stream ExplainResponse);

// Long-lived bidi RPC used by the "stream" stats-reporting mode (S3-variant).
// The broker keeps the stream open for the query lifetime; the server pushes one
// OpChainComplete per stage it ran on this server, followed by a ServerDone.
// Stats no longer travel through the mailbox in this mode.
// See OpChainComplete / MultiStageStatsTree below for the payload shape.
rpc SubmitWithStream(stream BrokerToServer) returns (stream ServerToBroker);
}

message CancelRequest {
Expand All @@ -38,7 +45,9 @@ message CancelRequest {
}

message CancelResponse {
map<int32, bytes> statsByStage = 1; // stageId -> serialized MultiStageQueryStats.StageStats.Closed
// Deprecated: always empty since the stream-stats push feature was introduced; per-opchain stats
// now flow via OpChainComplete on the SubmitWithStream bidi RPC and are no longer collected on cancel.
map<int32, bytes> statsByStage = 1;
}

// QueryRequest is the dispatched content for all query stages to a physical worker.
Expand Down Expand Up @@ -100,3 +109,81 @@ message MailboxInfo {
message Properties {
map<string, string> property = 1;
}

// =============================================================================
// SubmitWithStream — stream-mode stats reporting messages
// =============================================================================

// Messages flowing from broker to server on the SubmitWithStream stream.
// First message MUST be a `submit`. Subsequent messages may carry cancel.
message BrokerToServer {
oneof payload {
// Plan submission. Same shape as today's unary Submit request.
QueryRequest submit = 1;
// Optional cancel message. In Phase A the broker still uses the unary
// Cancel RPC; this field is reserved and processed by the server but only
// emitted by the broker in Phase B.
CancelRequest cancel = 2;
}
}

// Messages flowing from server to broker on the SubmitWithStream stream.
// The first server message is a `submit_ack`. Each opchain that runs on this
// server emits exactly one `opchain` message when it finishes (success or error).
// After the last opchain has reported, the server emits `done` and half-closes.
message ServerToBroker {
oneof payload {
// Synchronous-style ack for the submission. Same shape as today's unary
// QueryResponse. Carries early errors (plan parsing, malformed metadata).
QueryResponse submit_ack = 1;
// Per-stage stats from one opchain that ran on this server.
OpChainComplete opchain = 2;
// Final marker — the server has no more messages for this query.
ServerDone done = 3;
}
}

// Sent once per opchain that ran on this server, regardless of success/failure.
message OpChainComplete {
int32 stage_id = 1;
int32 worker_id = 2;
bool success = 3;
string error_msg = 4; // populated when success = false
MultiStageStatsTree stats = 5; // structured, tree-shaped stats payload
}

// Final marker on the server-to-broker stream.
message ServerDone {
}

// Multi-stage stats produced by one opchain. Carries the current stage as an
// explicit operator tree, plus any upstream-stage trees this opchain
// accumulated (today these reach an opchain via pipeline-breaker stats and via
// mailbox-receive merges; in stream mode each upstream stage is also reported
// by its own server, so the broker merges duplicates per stage).
message MultiStageStatsTree {
int32 current_stage_id = 1;
StageStatsNode current_stage = 2;
// Sparse map of upstream stage id -> tree.
map<int32, StageStatsNode> upstream_stages = 3;
}

// One node of the operator tree for a single stage.
message StageStatsNode {
// The MultiStageOperator.Type id, same byte we serialize today via
// MultiStageOperator.Type.getId().
int32 operator_type_id = 1;
// Plan node ids that compile down to this operator. One-to-many; the leaf
// operator typically owns the whole sub-tree of v1 plan nodes below the leaf
// boundary. Stable per request via PlanNode.getNodeId().
repeated int32 plan_node_ids = 2;
// Opaque StatMap bytes -- same encoding the existing StatMap.serialize
// produces. The broker decodes by looking up the StatMap key class via
// MultiStageOperator.Type.fromId(operator_type_id).
bytes stat_map = 3;
// Children in left-to-right order. Arity is whatever the operator dictates
// (1 for transforms, 2 for joins, N for set ops). The tree shape on the wire
// is canonical -- legacy mode keeps using its inorder/implicit-arity
// encoding.
repeated StageStatsNode children = 4;
}
Loading