Skip to content

[multistage] Push stage stats to the broker via SubmitWithStream bidi RPC (issue #18375)#18458

Open
gortiz wants to merge 27 commits into
apache:masterfrom
gortiz:feature/mse-push-stats
Open

[multistage] Push stage stats to the broker via SubmitWithStream bidi RPC (issue #18375)#18458
gortiz wants to merge 27 commits into
apache:masterfrom
gortiz:feature/mse-push-stats

Conversation

@gortiz
Copy link
Copy Markdown
Contributor

@gortiz gortiz commented May 11, 2026

Summary

Implements the proposal from #18375: a new opt-in mode where servers push per-opchain stats directly to the broker over a long-lived bidi gRPC stream, instead of piggy-backing them onto the mailbox just before EOS.

Resolves #18375.

What the issue proposed vs. what is implemented

The implementation follows the issue's proposal closely, with a few clarifications:

Area Issue proposal This PR
New RPC SubmitWithStream(stream BrokerToServer) returns (stream ServerToBroker) ✅ Implemented as specified
BrokerToServer submit + cancel payloads ✅ Both implemented; cancel routes to QueryRunner.cancel() and also fires on stream-close
ServerToBroker submit_ack + OpChainComplete + ServerDone ✅ Implemented as specified
MultiStageStatsTree / StageStatsNode Structured tree-shaped payload with operator type, plan-node ids, stat bytes, children ✅ Implemented; encoder walks the live operator tree + flat MultiStageQueryStats lists; decoder produces a StageStatsTreeNode accumulator on the broker
Op→PlanNode mapping Captured via existing BiConsumer tracker in PlanNodeToOpChain; leaf sub-tree walked ✅ Implemented on OpChainExecutionContext
Stats in mailbox Suppressed when stream mode is active ✅ Server sends EOS without stats when SubmitWithStream is in use
Broker accumulator Per-stage Map<Integer, StageStatsNode> with StatMap.merge per node; tree-shape mismatch → mergeFailed StreamingQuerySession owns the accumulator and per-stage coverage counters
Wait window After data-mailbox EOS, broker waits up to min(50 ms, remaining timeout) for outstanding OpChainCompletes ✅ Success path (submitAndReduceWithStream) waits up to the full remaining query timeout; error path (tryRecoverWithStream) caps at 50 ms since the result is already determined
Fan-out cancel On first peer error, broker sends BrokerToServer.cancel on all open streams StreamingQuerySession.fanOutCancel()
Per-stage coverage responded / mergeFailed / missing exposed in broker response as streamStatsCoverage[] ✅ Exposed on QueryResult and broker JSON response
Config Cluster-level pinot.broker.multistage.use.stream.stats.reporting + per-query useStreamStatsReporting option ✅ Implemented; default is false (legacy mode unchanged)
Fallback Broker detects UNIMPLEMENTED and falls back to legacy for that query ✅ Implemented in DispatchClient

Notable divergences / implementation decisions not in the issue:

  • ServerDone is implicit, not a separate message. The server half-closes the outbound stream (calls onCompleted()) after the last OpChainComplete rather than sending a dedicated ServerDone message. This is equivalent and avoids an extra round-trip.
  • Cancel via stream replaces the unary Cancel RPC for stream-mode queries (Phase B). The separate Cancel RPC is kept alive for one release for backward compatibility but its response no longer carries stats.
  • O(1) cancel in OpChainSchedulerService (all modes). The previous cancel() scanned _opChainCache O(n). A new _executionContextByRequest map + _activeOpChainsByRequest reference-counter enable direct QueryExecutionContext.terminate() without a scan. The write lock is acquired before the compute() eviction to close the race window between context removal and cancelled-query cache write.
  • tryRecoverWithStream fans out cancel and then waits up to 50 ms for any remaining OpChainComplete stats (capped short because the result is already determined). The success path (submitAndReduceWithStream) uses the full remaining timeout instead.
  • N-ary set-op tree reconstruction is now correct. The legacy InStageStatsTreeBuilder heuristic (implicit arity) was lossy for set ops with more than two inputs. The new MultiStageStatsTree format carries the tree shape explicitly, so the broker reconstructs it exactly regardless of arity.
  • OperatorTypeDescriptor SPI for plugin-defined operators. MultiStageOperator.Type now implements a new OperatorTypeDescriptor interface. An OperatorTypeRegistry (populated from Type.values() at startup, then extended via ServiceLoader) lets plugin jars register custom operator types without modifying core. Plugin descriptors override mergeInto() to control how their stats appear in the broker response, and can override updateServerMetrics() to push custom server-side metrics. Plugin ids must be ≥ 256 (ids 0–255 are reserved for built-ins).

Key files

File Role
pinot-common/src/main/proto/worker.proto New RPC + BrokerToServer / ServerToBroker / OpChainComplete / MultiStageStatsTree / StageStatsNode messages
QueryServer.java SubmitWithStream handler: plan submission, opchain completion callbacks, cancel-via-stream, stream-close-as-cancel
MultiStageStatsTreeEncoder.java Server-side: walks live operator tree + MultiStageQueryStats flat lists → MultiStageStatsTree proto
MultiStageStatsTreeDecoder.java Broker-side: decodes proto → StageStatsTreeNode accumulator; detects shape mismatches
StreamingQuerySession.java Per-query accumulator: per-stage tree merge, coverage counters, completion latch, fan-out cancel
QueryDispatcher.java Opens SubmitWithStream streams; wait window; error recovery; stats coverage in response
OpChainSchedulerService.java O(1) cancel; completion listener registration for stream-mode stats push
OpChainExecutionContext.java Op→PlanNode map captured at construction time
OperatorTypeDescriptor.java SPI interface: getId(), name(), getStatKeyClass(), mergeInto(), updateServerMetrics()
OperatorTypeRegistry.java Static registry: built-ins loaded from Type.values(), plugins discovered via ServiceLoader
StreamStatsReportingIntegrationTest.java Integration tests: simple aggregation, join (≥3 stages), three-way UNION (N-ary set op regression), cluster-level config activation

Test plan

  • StreamStatsReportingIntegrationTest — 4 tests covering simple aggregation, join, three-way UNION (N-ary set-op regression), cluster-level config; all pass
  • MultiStageStatsTreeEncoderTest / MultiStageStatsTreeDecoderTest — round-trip, merge, shape-mismatch handling
  • StreamingQuerySessionTest — coverage counters, fan-out cancel, latch semantics
  • OpChainSchedulerServiceTest — O(1) cancel, multi-opchain context cleanup, completion listener lifecycle
  • DispatchClientTest / StreamingDispatchObserverTestSubmitWithStream client-side observer, UNIMPLEMENTED fallback
  • OperatorTypeRegistryTest — all 16 built-in types registered, unknown id returns null, descriptor methods delegate to enum
  • Legacy mode unchanged: all existing MSE integration tests unaffected (stream mode is opt-in, default off)

🤖 Generated with Claude Code

gortiz and others added 16 commits April 30, 2026 19:35
…initions

Phase A.0 of apache#18375. Purely additive proto changes — new long-lived bidi
RPC alongside the existing unary Submit/Cancel, plus the message types
(BrokerToServer, ServerToBroker, OpChainComplete, ServerDone,
MultiStageStatsTree, StageStatsNode) needed by the upcoming stream-mode
stats reporting path.

No behavior change yet: nothing implements or invokes the new RPC.
Generated Java + gRPC stubs verified to compile.
Phase A.1.a of apache#18375. The existing PlanNodeToOpChain BiConsumer tracker
now also populates a per-opchain Map<MultiStageOperator, List<PlanNode>>
on OpChainExecutionContext. Cardinality is one-to-many: intermediate
operators map 1:1 to their PlanNode; the leaf operator records the
whole sub-tree of v1 plan nodes below the leaf-stage boundary (walked
once at construction).

This will be consumed by the upcoming MultiStageStatsTreeEncoder
(stream-mode stats reporting). No behavior change yet for legacy mode.
…orting

Phase A.1.b of apache#18375. Walks an opchain's live operator tree in lock-step
with the flat MultiStageQueryStats lists (already maintained in inorder)
to produce a tree-shaped MultiStageStatsTree proto. Each node carries
the operator type id, opaque StatMap bytes, recursive children, and the
stage-scoped plan-node ids gathered during opchain construction.

PlanNodeToOpChain now also performs a deterministic pre-order walk over
the stage's plan tree, assigning each PlanNode a sequential integer id
on OpChainExecutionContext. Both broker and server perform the same walk
on the same plan structure, so the ids match without being serialized
on the wire.

The encoder has a Function<MultiStageOperator, List<Integer>> overload
used by tests so they don't need to construct a full
OpChainExecutionContext. Production calls go through the
OpChainExecutionContext-resolving overload.

Coverage: linear chain ordering, N-ary set-op (3 inputs) with
left-to-right child order, leaf operator one-to-many planNodeIds fan-out,
tree/flat-list mismatch detection.
Phase A.1.c of apache#18375. Broker-side counterpart to MultiStageStatsTreeEncoder:

- StageStatsTreeNode: in-memory view of a decoded per-stage operator
  tree, mutable, with a recursive merge(StageStatsTreeNode) that sums
  StatMaps node-by-node. Throws ShapeMismatchException when the two
  trees disagree on operator type or arity at any position.
- MultiStageStatsTreeDecoder: turns a Worker.MultiStageStatsTree proto
  into a Decoded(currentStageId, currentStage, upstreamStages) value.
  Throws DecodeFailedException on unknown operator type ids — gives the
  broker a typed signal to mark mergeFailed for that stage and continue.

Coverage:
- decode round-trip preserves type, planNodeIds, statMap, children
- merge same-shape sums counters
- merge type/arity mismatch throws ShapeMismatchException
- decode unknown operator type id throws DecodeFailedException
  (mixed-version safety: newer server, older broker)
Phase A.2.0 of apache#18375. Wires up the new bidi RPC's gRPC mechanics:

- New per-call SubmitWithStreamObserver handles inbound BrokerToServer
  messages. First message must be `submit`; subsequent `cancel` is
  routed to QueryRunner.cancel; broker stream-close also cancels.
- Plan submission runs on the existing _submissionExecutorService via
  submitInternal, identical to the unary Submit path. The submit_ack
  message replaces today's unary QueryResponse.
- Response stream onNext calls are serialised under a per-call lock
  (gRPC requires onNext to be called serially).

OpChainComplete / ServerDone emission is deferred: that needs a per-
opchain completion hook on OpChainSchedulerService which is added in
the next sub-commit. Today this skeleton runs the query end-to-end but
loses stream-mode stats — only the broker-side ack and cancel-via-
stream-close paths are useful as-is.
Phase A.2.1 of apache#18375. Adds the per-opchain completion hook that the
stream-mode handler needs to emit OpChainComplete and ServerDone.

OpChainSchedulerService:
- New ConcurrentMap<requestId, OpChainCompletionListener> registry.
- registerCompletionListener / unregisterCompletionListener methods.
- registerInternal captures stats in an AtomicReference inside the
  TraceRunnable and the FutureCallback invokes the listener with the
  captured stats on both success and error paths. Listener exceptions
  never block opchain teardown.

QueryRunner exposes registerOpChainCompletionListener /
unregisterOpChainCompletionListener as passthroughs.

QueryServer.SubmitWithStreamObserver:
- Counts expected opchains for the request (sum of WorkerMetadata
  across all stage plans).
- Registers the listener BEFORE submitting (avoids race where short
  opchains finish before registration).
- Each event encodes via MultiStageStatsTreeEncoder, builds an
  OpChainComplete proto, and emits on the response stream under the
  per-call lock.
- Encoding failures (e.g. tree/flat-list mismatch on error path) are
  surfaced on OpChainComplete with success=false + error_msg, never
  block stream completion.
- After all expected opchains have reported, unregisters the listener
  and emits ServerDone, closing the stream.

Tests cover listener fire on success, fire on error, and no-fire after
unregister.
Phase A.2.2 of apache#18375. In stream mode the broker collects stats out-of-
band on the bidi RPC via OpChainComplete, so paying the cost of also
serializing them onto the mailbox EOS path is wasteful. Per-request
override:

- New CommonConstants.MultiStageQueryRunner.KEY_OF_STATS_REPORTING_MODE
  metadata key with values STATS_REPORTING_MODE_STREAM / _LEGACY.
- QueryServer.SubmitWithStreamObserver.handleSubmit injects the key
  with value "stream" into reqMetadata before invoking submitInternal.
- QueryRunner.processQueryBlocking computes effectiveSendStats(...)
  from the metadata, forcing sendStats=false when the key is "stream".
  All downstream consumers (PipelineBreakerExecutor, OpChainExecution-
  Context, error path) use the effective value.
- Comment added on QueryRunner._sendStats noting the per-request
  override; effectiveSendStats() Javadoc points back to the metadata
  key.

Not user-facing: brokers do not set this key; it exists purely as an
internal channel from the SubmitWithStream handler to the runner.
Phase A.3.0 of apache#18375. Per-query state container for the broker's
SubmitWithStream dispatch path:

- StreamingQuerySession owns the per-stage tree accumulator
  (Map<stageId, StageStatsTreeNode>), per-stage counters
  (responded / mergeFailed), open-streams set, and a CountDownLatch
  initialised to expectedOpChains.
- Completion semantics: awaitCompletion(timeout, unit) returns true as
  soon as every expected opchain has reported (early completion — the
  common case in stream mode), and false if the timeout fires first.
  The dispatcher will only call this AFTER the broker receiving mailbox
  has finished, so a `true` return means "data done AND stats fully
  accounted for".
- recordOpChainComplete decodes the MultiStageStatsTree, merges into
  the per-stage accumulator (StatMap.merge), and on shape mismatch /
  decode failure marks the stage mergeFailed and continues. success=
  false reports trigger fan-out cancel exactly once.
- recordStreamError variant lets the dispatcher drain the latch by the
  number of opchains a dead server still owed; also fires fan-out
  cancel.
- StreamingServerHandle is the abstraction over an open server stream
  used for cancel fan-out, so the session doesn't depend on concrete
  gRPC stub types.

Coverage (snapshotCoverage()) is exposed as an immutable view of
responded / mergeFailed counts plus the merged accumulator. Missing
opchains are computed by the caller against the expected total.

Tests cover early completion (returns immediately when all reports
arrived before the wait window), timeout fall-through, cross-worker
stats sum, fan-out cancel idempotency on peer error, stream-error
draining the latch by remainingExpected, and concurrent reports
across 50 threads.
Phase A.3.1 of apache#18375. Broker-side gRPC plumbing for opening
SubmitWithStream bidi calls.

StreamingDispatchObserver:
- Implements StreamObserver<ServerToBroker> for the inbound side and
  StreamingServerHandle for the outbound side (cancel fan-out).
- First ServerToBroker message must be submit_ack — passed to the
  caller's BiConsumer<QueryResponse, Throwable>. Subsequent OpChain
  messages forward to the session; ServerDone unregisters this stream
  from the session's open-set; onError drains the session's latch by
  the per-server remaining-expected count.

DispatchClient.submitWithStream:
- Opens the bidi RPC, sends the initial submit, registers the observer
  with the session, returns the observer for the caller to track.
- Uses BiConsumer rather than the package-private AsyncResponse so the
  streaming subpackage doesn't depend on AsyncResponse visibility.

No high-level integration into submitAndReduce yet — that's A.3.2.
Phase A.3 follow-up. Unit-level coverage for the broker-side gRPC
observer added in the previous commit:

- Happy path: submit_ack → 2 OpChainComplete → ServerDone, with the
  ack callback firing once and the session latch draining to zero.
- onError before submit_ack: ack callback receives the error so the
  dispatcher's ack-await doesn't hang; latch drains by remaining-
  expected.
- Cancel emits a BrokerToServer.cancel onto the outbound stream.
- onError after partial reports drains only the remaining-expected
  count, not the full per-server count (no double-decrement).
Phase A.3.2 of apache#18375. Broker-side end-to-end integration of the
streaming dispatch path, gated by a per-query option.

Per-query option:
- New QueryOptionKey.USE_STREAM_STATS_REPORTING ("useStreamStatsReporting")
- QueryOptionsUtils.isUseStreamStatsReporting reads it.

QueryDispatcher.submitAndReduce branches on the option:
- Legacy path unchanged.
- New submitAndReduceWithStream:
  1. Sizes a StreamingQuerySession with expectedOpChains = sum across
     non-root stages of total workers.
  2. Calls submitWithStream(...) which opens one SubmitWithStream bidi
     RPC per server via DispatchClient.submitWithStream (computing the
     per-server expected opchain count for correct latch draining on
     stream errors). Awaits all submit_acks via the existing
     processResults/ack-queue pattern.
  3. Runs runReducer() -- broker's stage 0 mailbox-receive -- exactly
     as today.
  4. After mailbox EOS, awaits stats with early-completion semantics:
     session.awaitCompletion(remainingTimeoutMs, MILLISECONDS) returns
     true the moment every expected opchain has reported, false on
     timeout. Either way the broker proceeds to build the result.
  5. Merges session stats into broker's local stage-0 stats via the
     new StageStatsTreeNode.flattenInorder() helper and a new
     QueryResult constructor that takes a pre-built per-stage list.

Where both broker (pipeline-breaker) and session (server-reported)
have an entry for the same stage, the session wins -- avoids double-
counting stats that the upstream server also reported.

Default for the option is false, so legacy behavior is preserved
unless the broker explicitly opts in.
Phase A.3 follow-up. Make the no-fallback design choice explicit in
both the per-query option Javadoc and on submitAndReduceWithStream:

- Enabling useStreamStatsReporting requires every server in the
  cluster to support SubmitWithStream. Operators are responsible for
  setting it only after the fleet is fully upgraded.
- If any server returns UNIMPLEMENTED (or any other transport error)
  during dispatch, the broker cancels the query and surfaces the
  error to the client.

No code change — the existing path already does this: the failing
ack lands on the queue with a non-null throwable, processResults
throws, tryRecover catches and calls cancel(requestId, servers)
before rethrowing. Just clarifying that this is the intended behavior
rather than a fallback to add later.
Adds pinot.broker.multistage.use.stream.stats.reporting (default false)
so operators can enable stream-mode stats for all queries cluster-wide
without requiring per-query opt-in. Individual queries can still override
via the useStreamStatsReporting query option.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…r response

Stream-mode queries now record how many opchain reports were received,
how many failed to merge (shape mismatch / decode error), and how many
went missing (timed out or stream error). The counts are carried by a new
QueryResult.StageCoverage list and rendered as a streamStatsCoverage JSON
array in BrokerResponseNativeV2, indexed by stage id, so consumers can see
exactly which stages delivered partial stats.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…espace

Integration test exercises the SubmitWithStream stats path end-to-end:
- Simple aggregation, join (multi-stage), and three-way UNION (N-ary set op)
  all run with useStreamStatsReporting=true and verify results are correct
  and streamStatsCoverage shows full coverage (responded>0, missing=0, mergeFailed=0).
- Cluster-level config test uses overrideBrokerConf to enable stream stats
  for all queries without a per-query option.

Also fixes the config key namespace: rename from
pinot.broker.multistage.use.stream.stats.reporting to
pinot.broker.mse.use.stream.stats.reporting, consistent with all other
MSE broker config keys in CommonConstants.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ulerService

Folds query cancellation onto the SubmitWithStream bidi RPC so the broker
does not need a separate unary Cancel call in stream mode, and improves
cancel throughput for all modes via direct QueryExecutionContext lookup.

Broker side:
- fanOutCancel() is now public on StreamingQuerySession and is used
  everywhere the broker previously called the unary Cancel RPC while in
  stream mode (ack errors, processing exceptions, query timeout).
- tryRecoverWithStream() fans out cancel then waits for remaining
  OpChainComplete stats up to the query deadline before building the
  error QueryResult.
- cancelWithStats() removed; on the legacy path cancel no longer
  collects stats synchronously (error-path stats arrive via
  OpChainCompletionListener in stream mode).

Server side:
- SubmitWithStream handler already routes BrokerToServer.cancel to
  QueryRunner.cancel(); unary Cancel RPC is simplified (no longer
  serialises stats into the response).
- QueryRunner.cancel() / OpChainSchedulerService.cancel() return void.

OpChainSchedulerService O(1) cancel (all modes):
- Two new maps (_executionContextByRequest, _activeOpChainsByRequest)
  replace the previous O(n) opChainCache scan in cancel().
- All mutations of both maps for the same requestId are serialised via
  ConcurrentHashMap.compute() on the counter map; cancel() acquires the
  query write lock before compute() to close the window where a
  concurrent register() could slip through the cache check.
- _opChainCache entries are invalidated promptly on cancel to release
  operator memory without waiting for TTL expiry.
- Belt-and-suspenders: _cancelledQueryCache is always written on cancel
  regardless of whether a live context was found.
- New test shouldCleanUpContextAfterAllOpChainsComplete verifies that
  the context map is empty after all opchains for a multi-opchain
  request complete; existing cancel test asserts activeRequestCount()==0
  immediately after cancel() returns.

The unary Cancel RPC is kept for one release of overlap per plan.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 11, 2026

Codecov Report

❌ Patch coverage is 48.44633% with 365 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.63%. Comparing base (bd76ff0) to head (886ab99).
⚠️ Report is 5 commits behind head on master.

Files with missing lines Patch % Lines
.../pinot/query/service/dispatch/QueryDispatcher.java 2.96% 130 Missing and 1 partial ⚠️
...apache/pinot/query/service/server/QueryServer.java 1.56% 126 Missing ⚠️
...requesthandler/MultiStageBrokerRequestHandler.java 4.16% 23 Missing ⚠️
...e/pinot/query/runtime/plan/StageStatsTreeNode.java 46.34% 21 Missing and 1 partial ⚠️
.../dispatch/streaming/StreamingDispatchObserver.java 70.00% 12 Missing and 3 partials ⚠️
...va/org/apache/pinot/query/runtime/QueryRunner.java 46.15% 6 Missing and 1 partial ⚠️
...vice/dispatch/streaming/StreamingQuerySession.java 93.20% 6 Missing and 1 partial ⚠️
...t/query/runtime/operator/OperatorTypeRegistry.java 53.84% 5 Missing and 1 partial ⚠️
...e/pinot/query/service/dispatch/DispatchClient.java 0.00% 6 Missing ⚠️
...query/runtime/plan/MultiStageStatsTreeEncoder.java 87.50% 4 Missing and 1 partial ⚠️
... and 8 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18458      +/-   ##
============================================
- Coverage     63.68%   63.63%   -0.05%     
+ Complexity     1685     1684       -1     
============================================
  Files          3266     3273       +7     
  Lines        199821   200459     +638     
  Branches      31022    31108      +86     
============================================
+ Hits         127257   127564     +307     
- Misses        62425    62742     +317     
- Partials      10139    10153      +14     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 63.63% <48.44%> (-0.05%) ⬇️
temurin 63.63% <48.44%> (-0.05%) ⬇️
unittests 63.63% <48.44%> (-0.05%) ⬇️
unittests1 55.81% <50.00%> (-0.02%) ⬇️
unittests2 34.83% <1.27%> (-0.12%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

- StageStatsTreeNode: replace ImmutableList.copyOf with List.copyOf
  to eliminate the banned Guava import
- DispatchClient: wrap @return Javadoc line to stay under 120 chars
- StreamingDispatchObserverTest: remove blank line before closing brace

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See inline comment.

gortiz and others added 4 commits May 12, 2026 15:34
Extract MultiStageOperator.Type into an OperatorTypeDescriptor interface so
plugin-defined operators can participate in the stream-mode stats path without
modifying core. The Type enum implements the interface unchanged; a new
OperatorTypeRegistry loads built-in types at startup and discovers plugins via
ServiceLoader (META-INF/services). Plugin ids must be >= 256.

Changed files:
- OperatorTypeDescriptor (new) — SPI interface
- OperatorTypeRegistry (new) — static registry, ServiceLoader-discovered
- MultiStageOperator.Type — implements OperatorTypeDescriptor
- MultiStageOperator.getOperatorType() — return type widened to interface
- MultiStageQueryStats._operatorTypes — List<OperatorTypeDescriptor>
- MultiStageStatsTreeDecoder — uses OperatorTypeRegistry.fromId()
- MultiStageStatsTreeEncoder — local type var is OperatorTypeDescriptor
- StageStatsTreeNode — _type is OperatorTypeDescriptor; merge() uses getId()
- InStageStatsTreeBuilder — local vars widened to OperatorTypeDescriptor
- OperatorTypeRegistryTest (new) — verifies all 16 built-ins present, unknown id returns null

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tryTest

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Three new tests in StreamingQuerySessionTest verify that query completion
is not blocked when stats cannot be decoded or merged:
- Unknown operator type id (DecodeFailedException) → mergeFailed, latch drains
- Shape mismatch between workers (ShapeMismatchException) → mergeFailed, first
  worker's stats preserved
- Corrupted/truncated stat bytes (EOFException → DecodeFailedException) →
  mergeFailed, latch drains

Rename useStreamStatsReporting/CONFIG_OF_USE_STREAM_STATS_REPORTING to
streamStats/CONFIG_OF_STREAM_STATS for brevity across all call sites.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ptions to Calcite rules

PlannerContext now implements org.apache.calcite.plan.Context so that
Calcite rules registered via the RuleSetCustomizer SPI can gate on
per-query options without reflection:

  PlannerContext ctx = call.getPlanner().getContext().unwrap(PlannerContext.class);
  if (ctx != null) { String v = ctx.getOptions().get("myOption"); }

Changes:
- PlannerContext implements Context; overrides unwrap():
  - clazz.isInstance(this) → returns this (covers PlannerContext and Context)
  - clazz.isInstance(_envConfig) → delegates to envConfig (QueryEnvironment.Config
    backward compat for PinotLogicalAggregateRule / PinotAggregateExchangeNodeInsertRule)
  - otherwise → null
- Both LogicalPlanner constructors now receive `this` instead of
  Contexts.EMPTY_CONTEXT / Contexts.of(envConfig).
- PlannerContext.forTesting(options, envConfig) static factory + package-private
  two-arg constructor (no-op planners) for unit tests that don't need a full
  QueryEnvironment.
- close() guards against null _planner (test constructor sets it to null).
- PlannerContextTest: 6 TestNG tests covering unwrap contract and planner delegation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
gortiz and others added 3 commits May 13, 2026 14:08
… reports fewer opchains than expected

When a server sends submit_ack(STATUS_ERROR) followed by ServerDone
without any opchains, the previous DONE handler only unregistered the
stream but never decremented the completion latch — causing
awaitCompletion to block until the query deadline rather than returning
promptly on submission failure.

The fix: compute remaining = expected - reported; if positive, call
recordStreamError so the latch drains immediately and fan-out cancel
fires (the query is doomed if a server couldn't start).  After draining,
advance _opChainsReportedForThisServer to _expectedOpChainsForThisServer
so a subsequent onError (stream reset after DONE delivery) cannot
double-drain the latch.

Also added:
- Javadoc on _opChainsReportedForThisServer documenting the
  single-threaded gRPC callback invariant (no extra synchronization
  needed).
- Regression test: STATUS_ERROR ack + DONE with 0 reported opchains
  → awaitCompletion returns within 1 s.
- Regression test: 1 of 3 opchains reported + DONE → remaining 2
  counts drained correctly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ction

Two new tests:

1. testOnErrorAfterDoneWithRemainingDoesNotDoubleDrain: verifies that when
   a late onError arrives after DONE already drained remaining latch counts,
   the latch stays at 0 (no double-drain). This exercises the guard added in
   the previous commit (_opChainsReportedForThisServer = _expectedOpChainsForThisServer).

2. testOnCompletedAfterCleanDoneIsNoOp: verifies that the gRPC onCompleted
   callback arriving after a clean DONE (remaining == 0) is idempotent —
   unregisterStream is a no-op on an already-removed stream.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@yashmayya yashmayya added multi-stage Related to the multi-stage query engine feature New functionality labels May 13, 2026
Copy link
Copy Markdown
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review focused on performance/throughput overhead vs the existing legacy path (per the discussion on #18375). Architecture-wise this is sound — data path is unchanged, payload sizes are comparable, broker accumulator semantics look correct. But there are a handful of concrete things in this implementation that will cap throughput / regress latency at high QPS before legacy mode would, and that I'd want to see addressed before flipping _streamStats=true by default.

Headline concerns (each has an inline below):

  • No 50ms wait window capsubmitAndReduceWithStream waits the full remaining query timeout for outstanding OpChainCompletes. A single stuck opchain blocks the client response until query deadline. The design doc on #18375 explicitly specified a min(50ms, remainingTimeout) hard cap.
  • HTTP/2 stream limit — one ManagedChannel per server, no MAX_CONCURRENT_STREAMS bump. Long-lived streams now hold HTTP/2 slots for the full query lifetime → silent throughput cap at high QPS × many servers.
  • Decode + merge on Netty event loops under a per-query lock — cross-query head-of-line blocking risk at high QPS.
  • Error path also waits full timeout — strict regression vs legacy, which returns immediately on cancel.

Non-blocking but worth fixing before scale-up:

  • Cancel fan-out is undebounced (transient errors amplify into cancel storms across N peers).
  • Encoder failure on error path ⇒ broker has strictly less stats coverage than legacy on that opchain.
  • Per-opchain IdentityHashMap allocations now run on the legacy hot path too.
  • decodeNode has no depth guard.

Suggested validation before defaulting on: representative high-QPS soak (e.g. 5K QPS × 200 servers) measuring p50/p99/throughput in legacy vs stream; capture heap delta and Netty event-loop stall metrics. The success path of stream mode is comparable to legacy in theory — the threading + connection-cap details below need to be sorted for that to hold in practice. No correctness blockers — merge semantics, coverage accounting, O(1) cancel, and N-ary set-op reconstruction all look right.

// Receiving mailbox finished. Wait for stats: returns true as soon as every opchain has reported, or false
// when the timeout fires.
long remainingMs = Math.max(0, deadlineMs - System.currentTimeMillis());
boolean fullCoverage = session.awaitCompletion(remainingMs, TimeUnit.MILLISECONDS);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No wait-window cap (latency regression). This uses the full remaining query timeout, but the design doc on #18375 specified min(50ms, remainingTimeout) as a hard cap. As written, a single stuck opchain on any server holds the broker response back until the entire query budget elapses — e.g. a 10s-timeout query that has data in hand at t=100ms but is missing one OpChainComplete will return to the client at t=10000ms instead of ~t=150ms.

For p99-sensitive workloads this is a material regression vs legacy mode. Suggest a configurable cap (min(STATS_WAIT_CAP_MS, remainingMs)) with a 50–100ms default. Same issue applies in tryRecoverWithStream below.

java.util.function.BiConsumer<Worker.QueryResponse, Throwable> ackCallback) {
StreamingDispatchObserver observer = new StreamingDispatchObserver(virtualServer, session,
expectedOpChainsForThisServer, ackCallback);
StreamObserver<Worker.BrokerToServer> outbound = _dispatchStub.withDeadline(deadline).submitWithStream(observer);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP/2 stream cap will become the throughput ceiling. Each DispatchClient still holds a single ManagedChannel per server (the TODO at the top of the class notes this), and maxConcurrentCallsPerConnection isn't bumped on either side. Behavioral change vs unary submit: a stream slot used to be held for milliseconds, submitWithStream holds one for the full query lifetime.

With Q in-flight queries × N servers fanned out, each broker→server HTTP/2 connection needs ~Q concurrent streams. Netty/gRPC default MAX_CONCURRENT_STREAMS=100 means at high QPS new calls queue at the channel level — manifesting as broker slowness, not stream errors. Recommend before flipping _streamStats=true by default:

  • Bump server-side NettyServerBuilder.maxConcurrentCallsPerConnection in QueryServer.buildGrpcServer (today's code doesn't set it).
  • Implement the channel-pool TODO above this class, or document the per-server stream-count ceiling.
  • Add numActiveStreamsByServer metric for capacity observability.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. First, MAX_CONCURRENT_STREAMS is Integer.MAX_VALUE in Java (it seems it is 100 in Go). Anyway, the idea that new calls queue up in the stream make sense, but that should not be a problem.

Specifically, here we are not adding much noise. The new streams are not busy, they just send a small message when the opchains finish, which is data we don't send through the mailbox. In fact, the number of bytes being sent here is smaller, as the mailbox mechanism needs to send the stats of the deeper stages multiple times (as many as stages between them and the root stage).

That is why the suggested metric doesn't make much sense, as even the number of calls per connection is high, these should be silent calls.

Instead, I created two other metrics:

  • MSE_STREAM_STATS_INCOMPLETE_COVERAGE: number of queries that returned partial stats
  • MSE_STREAM_STATS_QUERIES: total queries using the new mechanism

}
if (statsTree.hasCurrentStage()) {
try {
MultiStageStatsTreeDecoder.Decoded decoded = MultiStageStatsTreeDecoder.decode(statsTree);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decode + merge under _lock on the Netty event loop. The class Javadoc says "doing it on the I/O thread is fine" — true at low QPS, but two real problems show up under load:

  1. Cross-query head-of-line blocking. Netty client event loops are shared across all open streams (across queries). A slow decode here delays OpChainComplete delivery for other queries on the same loop.
  2. Lock contention amplification. Two servers reporting near-simultaneously for the same query busy-spin the second I/O thread until the first releases.

MultiStageStatsTreeDecoder.decode is a recursive walk that allocates per node and deserializes StatMap bytes; mergeIntoAccumulatorLocked then walks again calling StatMap.merge per node. Suggest moving decode+merge to a small worker pool and keeping only the _completionLatch.countDown() + accumulator put under _lock. At minimum, please verify under a representative high-QPS soak before flipping the cluster default.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a valid concern. I've modified the code to decode outside the lock, but we need the lock to protect the merge code. I think that, now that we have virtual threads, the best way to resolve this is to have a vthread handle decoding and merging the stats for each query. This vthread would read the stats in raw format from a queue, decode and merge (without having to use locks), similar to an actor. I added a comment suggesting that, but didn't want to introduce that to not make this PR more conflict

if (!isSuccess) {
if (!_peerErrorObserved) {
_peerErrorObserved = true;
shouldFanOutCancel = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancel fan-out is undebounced. First success=false or transport-level error immediately broadcasts cancel to all N-1 peer servers. At scale this amplifies in two ways:

  • A flaky server returning errors on a small fraction of queries now triggers N-1 cancel sends per affected query — at high QPS during a partial degradation this is a meaningful self-inflicted cancel storm.
  • recordStreamError also triggers fan-out on any onError (broker GC pause beyond keep-alive, idle timeout, network reset). Premature-cancel probability grows with N × queryDuration.

Consider:

  • Debounce (cancel only after K errors / K ms).
  • Distinguish "server returned error" from "stream transport closed" — transport blips shouldn't kill peers.
  • Emit a cancelFanoutsBroadcast{reason} metric so this is observable in prod.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A flaky server returning errors on a small fraction of queries now triggers N-1 cancel sends per affected query — at high QPS during a partial degradation, this is a meaningful self-inflicted cancel storm.

The alternative is the system we have now, where opchains are not being stopped until they timeout or finish, even if there is no reader interested in their computations because they are early-terminated. The issue here is not so much in sending N-1 grpc messages but in how expensive it is to cancel each query, which was somehow expensive before because OpChaiScheduler needed to iterate over for (Map.Entry<OpChainId, Pair<MultiStageOperator, QueryExecutionContext>> entry : _opChainCache.asMap() looking for the opchains with the given id. Now the process is quite simpler.

Anyway, I think we can play it safe here and keep the original behavior unless a query option/broker config is used. I'm going to change that

}
if (stats != null) {
try {
builder.setStats(MultiStageStatsTreeEncoder.encode(rootOperator, stats, context));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two concerns here:

  1. Worse error-path coverage than legacy. When the encoder throws (most likely on treeSize != flatSize from an opchain that failed before emitting EOS), this emits success=false with empty stats. Stream mode also suppresses mailbox stats (see effectiveSendStats), so for this opchain the broker now has strictly less info than legacy mode would deliver via mailbox EOS or the cancel-RPC fallback. Worth either:
    • documenting this as a behavior change in the PR description, or
    • falling back to a best-effort flat encoder that emits whatever entries the flat list has, even when the tree shape doesn't align.
  2. Encode + onNext under _streamLock on the opchain executor thread, with NettyServerBuilder.directExecutor() set at class init (line 217) — inbound handlers also run on the same loop. Wide trees stall the loop both directions. Suggest moving encode + send to a small bounded worker pool.

}
break;
case OPCHAIN:
_session.recordOpChainComplete(message.getOpchain());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This synchronously kicks decode + merge on the gRPC client event loop via _session.recordOpChainComplete. See companion concern on StreamingQuerySession#recordOpChainComplete — at high QPS, expensive per-event work on the shared client event loop causes cross-query HOL blocking.

Aside: _opChainsReportedForThisServer is a plain int. The class Javadoc correctly notes gRPC serializes inbound callbacks per stream, but cancel() on this same instance is called from a different I/O thread (via fanOutCancel's iteration of _openStreams). Today that's safe because cancel() only touches _outbound (which is volatile), but the threading boundary is subtle — a short note on what runs on which thread would help future maintainers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This synchronously kicks decode + merge on the gRPC client event loop via _session.recordOpChainComplete. See companion concern on StreamingQuerySession#recordOpChainComplete — at high QPS, expensive per-event work on the shared client event loop causes cross-query HOL blocking.

This is the same as the agent pointed above, right? If that is the case, I've moved the decode phase outside the lock. Merging needs the lock and should be fast. In case it is not, we need to refactor the code a bit. I've added a comment indicating how to do that with a vhtread

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: _opChainsReportedForThisServer is a plain int. The class Javadoc correctly notes gRPC serializes inbound callbacks per stream, but cancel() on this same instance is called from a different I/O thread (via fanOutCancel's iteration of _openStreams). Today that's safe because cancel() only touches _outbound (which is volatile), but the threading boundary is subtle — a short note on what runs on which thread would help future maintainers.

I'll improve the javadoc

} catch (IOException e) {
throw new DecodeFailedException("Failed to deserialize StatMap for operator type " + type.name(), e);
}
List<StageStatsTreeNode> children = new ArrayList<>(node.getChildrenCount());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two concerns about this recursive walk:

  1. Unbounded stack depth. Driven by the proto's children list. A pathological / malformed payload (or a future bug-injected loop) can overflow the Netty event loop stack. Add an explicit depth cap (e.g. MAX_OPERATOR_TREE_DEPTH = 64) and short-circuit to DecodeFailedException.
  2. Per-query broker memory. Each opchain report keeps a fully-deserialized StageStatsTreeNode graph alive on the broker for the query's wait window. Aggregate broker footprint becomes ~O(numQueries × numServers × operatorsPerStage) — strictly larger than legacy's compact byte-merge buffer. Worth a heap-snapshot comparison vs legacy under representative load before defaulting on.

Map<String, String> effectiveOptions = query.getOptions();
if (_streamStats && !effectiveOptions.containsKey(
CommonConstants.Broker.Request.QueryOptionKey.STREAM_STATS)) {
effectiveOptions = new HashMap<>(effectiveOptions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allocates a fresh HashMap on every query whenever cluster default is on. At high QPS that's one extra alloc + put per query for what is effectively a constant. Cleaner: thread the cluster default through compileQuery / getQueryEnvConf so queryOptions carries it once, and drop this branch.

* <p>
* Identity-based (IdentityHashMap) because PlanNode equality is structural and two distinct nodes can compare equal.
*/
private final Map<MultiStageOperator, List<PlanNode>> _operatorToPlanNodes = new IdentityHashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_operatorToPlanNodes and _planNodeIds are allocated and populated for every opchain via PlanNodeToOpChain.assignPlanNodeIds + record, regardless of whether stats reporting will use them. In legacy mode (which remains the default) these maps are never read — pure GC dead weight on the hot path.

At high QPS × deep stages this adds up. Suggest gating the puts in PlanNodeToOpChain behind context.isSendStats() == false (i.e. stream mode active per KEY_OF_STATS_REPORTING_MODE), or making these maps lazy.

session.fanOutCancel();
long remainingMs = Math.max(0, deadlineMs - System.currentTimeMillis());
try {
session.awaitCompletion(remainingMs, TimeUnit.MILLISECONDS);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error path also waits full remaining timeout — same root issue as the success-path comment up the file. A query that hits an error at t=100ms with a 10s timeout doesn't return to the client until t=10000ms, vs legacy returning immediately on cancel.

For the error path specifically, blocking the client just to collect more stats is the wrong trade — you've already given up on the query. Use a much smaller cap here (e.g. 100–200ms) and represent the rest via streamStatsCoverage[].missing > 0.

gortiz and others added 3 commits May 14, 2026 09:11
…H benchmark

Three improvements in response to review comments:

1. Move proto decode outside the per-query lock in StreamingQuerySession.
   MultiStageStatsTreeDecoder.decode() is a recursive proto walk that
   allocates per node — keeping it inside _lock caused every Netty I/O
   thread receiving an opchain for the same query to serialize on the
   decode work, not just the map mutations. The decoded result is a fresh
   allocation with no shared state, so the decode is safe to perform
   before acquiring the lock. Only the accumulator map mutations are now
   done under _lock, reducing hold time to O(operators) merge work.

2. Cap the error-path stats wait at 50ms (STATS_DRAIN_ON_ERROR_MS).
   tryRecoverWithStream previously waited up to the full remaining query
   timeout after fanning out cancel — regressing error-path latency to
   the query deadline. The query result is already determined at this
   point; we collect partial stats on a best-effort basis only, so a
   short cap is appropriate. Missing stages are represented via
   streamStatsCoverage[].missing > 0.

3. Add BenchmarkStreamStatsMSQE to pinot-perf.
   JMH throughput benchmark with 2 concurrent threads comparing
   legacy stats mode vs stream-stats mode. Covers an aggregation
   (single-stage stats path) and a self-join (multi-stage stats path
   with multiple opchain completions per query). Run with:
     java -jar pinot-perf/target/benchmarks.jar BenchmarkStreamStatsMSQE

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…afety comments

- QueryServer: use compareAndSet in onError to match sendDoneAndComplete pattern and
  avoid double-cancel when the stream was already completed; add JMM ordering comment
  for _expectedOpChains.set → registerOpChainCompletionListener dependency
- StreamingQuerySession: remove unused 1-arg recordStreamError overload (all callers
  already use the 3-arg variant that passes the precise per-server remaining count);
  tighten Javadoc on the 3-arg variant explaining why the count must be precise; add
  virtual-thread actor comment on _lock for future contention mitigation
- QueryDispatcher: add comment on InterruptedException handling in tryRecoverWithStream
  explaining that mergeSessionStatsIntoResult does not block
- BrokerMeter: add MSE_STREAM_STATS_QUERIES and MSE_STREAM_STATS_INCOMPLETE_COVERAGE
  counters so operators can track stream-mode adoption and partial-coverage frequency
- MultiStageBrokerRequestHandler: wire the two new meters after submitAndReduce

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…sleep workload

- Replace single-server setup with startServers(2) and 4 concurrent threads
- Add tripleJoin benchmark (3 leaf + 2 join intermediates + 1 aggregate)
- Add sleep benchmark with CASE expression to block Calcite constant-folding
- Fix data race: pre-clone one supplier per segment instead of sharing one
  and calling snapshot() concurrently from parallel CompletableFutures
- Fix NPE: guard result.get("exceptions") before dereferencing .get(0)
- Fix stream stats option: pass via queryOptions string (canonical path)
- Fix getTableName() override so waitForAllDocsLoaded targets the right table
- Warn at setUp() when -ea is absent (sleep benchmark measures zero latency)
- Switch to AverageTime / 1s iterations (faster feedback than Throughput / 10s)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New functionality multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[multistage] New mode: push stage stats to the broker via long-lived plan-submission stream

4 participants