From adcc9eaffb146cfba40de91cce2f7641803325bb Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Fri, 15 May 2026 13:51:22 -0700 Subject: [PATCH 1/2] Add broker config to enable streaming group-by by default for a cluster --- .../MultiStageBrokerRequestHandler.java | 26 ++++++ .../MultiStageBrokerRequestHandlerTest.java | 81 +++++++++++++++++++ .../pinot/spi/utils/CommonConstants.java | 10 +++ 3 files changed, 117 insertions(+) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 846f6acb3c83..a5ad9811d605 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.annotations.VisibleForTesting; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -139,6 +140,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { private final Set _defaultDisabledPlannerRules; protected final long _extraPassiveTimeoutMs; protected final boolean _enableQueryFingerprinting; + @Nullable + protected final String _defaultStreamingGroupByFlushThreshold; protected final PinotMeter _stagesStartedMeter = BrokerMeter.MSE_STAGES_STARTED.getGlobalMeter(); protected final PinotMeter _stagesFinishedMeter = BrokerMeter.MSE_STAGES_COMPLETED.getGlobalMeter(); @@ -204,6 +207,13 @@ tlsConfig, isQueryCancellationEnabled(), cancelTimeout, dispatchKeepAliveTimeMs, _enableQueryFingerprinting = _config.getProperty( CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_FINGERPRINTING, CommonConstants.Broker.DEFAULT_BROKER_ENABLE_QUERY_FINGERPRINTING); + int streamingGroupByFlushThreshold = _config.getProperty( + CommonConstants.Broker.CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD, + CommonConstants.Broker.DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD); + // Pre-format the threshold once so that we don't allocate a new String on every query when the feature is enabled. + // null indicates "feature disabled", which matches the broker-config-unset case. + _defaultStreamingGroupByFlushThreshold = + streamingGroupByFlushThreshold > 0 ? Integer.toString(streamingGroupByFlushThreshold) : null; } @Override @@ -398,6 +408,9 @@ protected BrokerResponse handleRequestThrowing(long requestId, String query, Sql AtomicBoolean rlsFiltersApplied = new AtomicBoolean(false); checkAuthorization(requesterIdentity, requestContext, httpHeaders, compiledQuery, rlsFiltersApplied); + // Apply broker-default query options before branching to EXPLAIN/execute so both paths see the same options. + applyBrokerDefaultQueryOptions(compiledQuery.getOptions()); + if (sqlNodeAndOptions.getSqlNode().getKind() == SqlKind.EXPLAIN) { return explain(compiledQuery, requestId, requestContext, queryTimer); } else { @@ -545,6 +558,19 @@ private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders .build(); } + /** + * Applies broker-level defaults for MSE query options. Per-query overrides (i.e. {@code SET option = value} in the + * SQL text) always win because we use {@link Map#putIfAbsent} — a user can set the option to {@code 0} to opt out of + * a streaming default that the cluster has enabled. + */ + @VisibleForTesting + void applyBrokerDefaultQueryOptions(Map queryOptions) { + if (_defaultStreamingGroupByFlushThreshold != null) { + queryOptions.putIfAbsent(CommonConstants.Broker.Request.QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD, + _defaultStreamingGroupByFlushThreshold); + } + } + private long getTimeoutMs(Map queryOptions) { Long timeoutMsFromQueryOption = QueryOptionsUtils.getTimeoutMs(queryOptions); return timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : _brokerTimeoutMs; diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java index 7bb7b0776102..360f309a8511 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java @@ -19,6 +19,8 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.ws.rs.core.HttpHeaders; @@ -37,6 +39,8 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory; import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner; import org.apache.pinot.spi.utils.NetUtils; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; @@ -108,4 +112,81 @@ protected void onQueryCompletion(RequestContext requestContext, BrokerResponse b Assert.assertNotNull(capturedResponse.get(), "onQueryCompletion hook must be called with the BrokerResponse from handleRequest for MSE"); } + + @Test + public void testApplyBrokerDefaultQueryOptionsInjectsStreamingGroupByFlushThreshold() + throws Exception { + // When the broker config is set, the option is injected for queries that don't already specify it. + MultiStageBrokerRequestHandler handler = newHandlerWithStreamingGroupByFlushThreshold("5000"); + + Map queryOptions = new HashMap<>(); + handler.applyBrokerDefaultQueryOptions(queryOptions); + Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD), "5000", + "Broker default should be injected when query option is absent"); + } + + @Test + public void testApplyBrokerDefaultQueryOptionsPerQueryOverrideWins() + throws Exception { + // A per-query SET — including SET = 0 to disable — must take precedence over the broker default. + MultiStageBrokerRequestHandler handler = newHandlerWithStreamingGroupByFlushThreshold("5000"); + + Map queryOptions = new HashMap<>(); + queryOptions.put(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD, "0"); + handler.applyBrokerDefaultQueryOptions(queryOptions); + Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD), "0", + "Per-query SET = 0 must override the broker default"); + + queryOptions.clear(); + queryOptions.put(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD, "100"); + handler.applyBrokerDefaultQueryOptions(queryOptions); + Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD), "100", + "Per-query SET must take precedence over the broker default"); + } + + @Test + public void testApplyBrokerDefaultQueryOptionsNoInjectionWhenConfigUnset() + throws Exception { + // With the broker config unset (default 0), no option is injected. + MultiStageBrokerRequestHandler handler = newHandlerWithStreamingGroupByFlushThreshold(null); + + Map queryOptions = new HashMap<>(); + handler.applyBrokerDefaultQueryOptions(queryOptions); + Assert.assertFalse(queryOptions.containsKey(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD), + "No option should be injected when the broker default is unset"); + } + + private static MultiStageBrokerRequestHandler newHandlerWithStreamingGroupByFlushThreshold( + @Nullable String streamingGroupByFlushThreshold) + throws Exception { + PinotConfiguration config = new PinotConfiguration(); + config.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, "localhost"); + config.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, Integer.toString(NetUtils.findOpenPort())); + if (streamingGroupByFlushThreshold != null) { + config.setProperty(CommonConstants.Broker.CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD, + streamingGroupByFlushThreshold); + } + BrokerQueryEventListenerFactory.init(config); + BrokerMetrics.register(mock(BrokerMetrics.class)); + + QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class); + when(queryQuotaManager.acquire(anyString())).thenReturn(true); + when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true); + when(queryQuotaManager.acquireApplication(anyString())).thenReturn(true); + + return new MultiStageBrokerRequestHandler(config, "testBrokerId", new BrokerRequestIdGenerator(), + mock(RoutingManager.class), new AllowAllAccessControlFactory(), queryQuotaManager, + mock(TableCache.class), mock(MultiStageQueryThrottler.class), mock(FailureDetector.class), + ThreadAccountantUtils.getNoOpAccountant(), null, mock(WorkerManager.class), mock(WorkerManager.class)) { + @Override + public void start() { + // Skip dispatcher.start() and Calcite warmupCompile — neither is needed for this test. + } + + @Override + public void shutDown() { + // Match start() — no dispatcher was started, so there is nothing to shut down. + } + }; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index fad1c44f4af4..574e62a6a5b4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -507,6 +507,16 @@ public static class Broker { public static final String CONFIG_OF_IGNORE_MISSING_SEGMENTS = "pinot.broker.query.ignore.missing.segments"; public static final boolean DEFAULT_IGNORE_MISSING_SEGMENTS = false; + + /** + * Default flush threshold for the streaming group-by leaf-stage operator on MSE. When {@code > 0}, the broker + * injects this value as the {@link Request.QueryOptionKey#STREAMING_GROUP_BY_FLUSH_THRESHOLD} query option for + * MSE queries that do not already specify it, opting the cluster into the streaming group-by behavior by default. + * Setting the query option explicitly (including to {@code 0} to disable) always wins over the broker default. + */ + public static final String CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD = + "pinot.broker.mse.streaming.group.by.flush.threshold"; + public static final int DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD = 0; // Whether to infer partition hint by default or not. // This value can always be overridden by INFER_PARTITION_HINT query option public static final String CONFIG_OF_INFER_PARTITION_HINT = "pinot.broker.multistage.infer.partition.hint"; From ff1a053d9c83b326d8639dac23033b9aaf88b3f1 Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Wed, 20 May 2026 17:28:37 -0700 Subject: [PATCH 2/2] Address review comments: markdown Javadoc, use -1 as disabled sentinel --- .../MultiStageBrokerRequestHandlerTest.java | 2 +- .../org/apache/pinot/spi/utils/CommonConstants.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java index 360f309a8511..b4b5a13b5fc4 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java @@ -147,7 +147,7 @@ public void testApplyBrokerDefaultQueryOptionsPerQueryOverrideWins() @Test public void testApplyBrokerDefaultQueryOptionsNoInjectionWhenConfigUnset() throws Exception { - // With the broker config unset (default 0), no option is injected. + // With the broker config unset (default -1), no option is injected. MultiStageBrokerRequestHandler handler = newHandlerWithStreamingGroupByFlushThreshold(null); Map queryOptions = new HashMap<>(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 574e62a6a5b4..72e40d630493 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -509,14 +509,14 @@ public static class Broker { public static final boolean DEFAULT_IGNORE_MISSING_SEGMENTS = false; /** - * Default flush threshold for the streaming group-by leaf-stage operator on MSE. When {@code > 0}, the broker - * injects this value as the {@link Request.QueryOptionKey#STREAMING_GROUP_BY_FLUSH_THRESHOLD} query option for - * MSE queries that do not already specify it, opting the cluster into the streaming group-by behavior by default. - * Setting the query option explicitly (including to {@code 0} to disable) always wins over the broker default. + * Default flush threshold for the streaming group-by leaf-stage operator on MSE. When positive, the broker + * injects this value as the `streamingGroupByFlushThreshold` query option for MSE queries that do not already + * specify it, opting the cluster into the streaming group-by behavior by default. Setting the query option + * explicitly (including to `0` to disable) always wins over the broker default. */ public static final String CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD = "pinot.broker.mse.streaming.group.by.flush.threshold"; - public static final int DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD = 0; + public static final int DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD = -1; // Whether to infer partition hint by default or not. // This value can always be overridden by INFER_PARTITION_HINT query option public static final String CONFIG_OF_INFER_PARTITION_HINT = "pinot.broker.multistage.infer.partition.hint";