Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -139,6 +140,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final Set<String> _defaultDisabledPlannerRules;
protected final long _extraPassiveTimeoutMs;
protected final boolean _enableQueryFingerprinting;
@Nullable
protected final String _defaultStreamingGroupByFlushThreshold;

protected final PinotMeter _stagesStartedMeter = BrokerMeter.MSE_STAGES_STARTED.getGlobalMeter();
protected final PinotMeter _stagesFinishedMeter = BrokerMeter.MSE_STAGES_COMPLETED.getGlobalMeter();
Expand Down Expand Up @@ -204,6 +207,13 @@ tlsConfig, isQueryCancellationEnabled(), cancelTimeout, dispatchKeepAliveTimeMs,
_enableQueryFingerprinting = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_FINGERPRINTING,
CommonConstants.Broker.DEFAULT_BROKER_ENABLE_QUERY_FINGERPRINTING);
int streamingGroupByFlushThreshold = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD,
CommonConstants.Broker.DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD);
// Pre-format the threshold once so that we don't allocate a new String on every query when the feature is enabled.
// null indicates "feature disabled", which matches the broker-config-unset case.
_defaultStreamingGroupByFlushThreshold =
streamingGroupByFlushThreshold > 0 ? Integer.toString(streamingGroupByFlushThreshold) : null;
}

@Override
Expand Down Expand Up @@ -398,6 +408,9 @@ protected BrokerResponse handleRequestThrowing(long requestId, String query, Sql
AtomicBoolean rlsFiltersApplied = new AtomicBoolean(false);
checkAuthorization(requesterIdentity, requestContext, httpHeaders, compiledQuery, rlsFiltersApplied);

// Apply broker-default query options before branching to EXPLAIN/execute so both paths see the same options.
applyBrokerDefaultQueryOptions(compiledQuery.getOptions());

if (sqlNodeAndOptions.getSqlNode().getKind() == SqlKind.EXPLAIN) {
return explain(compiledQuery, requestId, requestContext, queryTimer);
} else {
Expand Down Expand Up @@ -545,6 +558,19 @@ private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders
.build();
}

/**
* Applies broker-level defaults for MSE query options. Per-query overrides (i.e. {@code SET option = value} in the
* SQL text) always win because we use {@link Map#putIfAbsent} — a user can set the option to {@code 0} to opt out of
* a streaming default that the cluster has enabled.
*/
@VisibleForTesting
void applyBrokerDefaultQueryOptions(Map<String, String> queryOptions) {
if (_defaultStreamingGroupByFlushThreshold != null) {
queryOptions.putIfAbsent(CommonConstants.Broker.Request.QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD,
_defaultStreamingGroupByFlushThreshold);
}
}

private long getTimeoutMs(Map<String, String> queryOptions) {
Long timeoutMsFromQueryOption = QueryOptionsUtils.getTimeoutMs(queryOptions);
return timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : _brokerTimeoutMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
Expand All @@ -37,6 +39,8 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
Expand Down Expand Up @@ -108,4 +112,81 @@ protected void onQueryCompletion(RequestContext requestContext, BrokerResponse b
Assert.assertNotNull(capturedResponse.get(),
"onQueryCompletion hook must be called with the BrokerResponse from handleRequest for MSE");
}

@Test
public void testApplyBrokerDefaultQueryOptionsInjectsStreamingGroupByFlushThreshold()
throws Exception {
// When the broker config is set, the option is injected for queries that don't already specify it.
MultiStageBrokerRequestHandler handler = newHandlerWithStreamingGroupByFlushThreshold("5000");

Map<String, String> queryOptions = new HashMap<>();
handler.applyBrokerDefaultQueryOptions(queryOptions);
Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD), "5000",
"Broker default should be injected when query option is absent");
}

@Test
public void testApplyBrokerDefaultQueryOptionsPerQueryOverrideWins()
throws Exception {
// A per-query SET — including SET = 0 to disable — must take precedence over the broker default.
MultiStageBrokerRequestHandler handler = newHandlerWithStreamingGroupByFlushThreshold("5000");

Map<String, String> queryOptions = new HashMap<>();
queryOptions.put(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD, "0");
handler.applyBrokerDefaultQueryOptions(queryOptions);
Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD), "0",
"Per-query SET = 0 must override the broker default");

queryOptions.clear();
queryOptions.put(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD, "100");
handler.applyBrokerDefaultQueryOptions(queryOptions);
Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD), "100",
"Per-query SET must take precedence over the broker default");
}

@Test
public void testApplyBrokerDefaultQueryOptionsNoInjectionWhenConfigUnset()
throws Exception {
// With the broker config unset (default -1), no option is injected.
MultiStageBrokerRequestHandler handler = newHandlerWithStreamingGroupByFlushThreshold(null);

Map<String, String> queryOptions = new HashMap<>();
handler.applyBrokerDefaultQueryOptions(queryOptions);
Assert.assertFalse(queryOptions.containsKey(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD),
"No option should be injected when the broker default is unset");
}

private static MultiStageBrokerRequestHandler newHandlerWithStreamingGroupByFlushThreshold(
@Nullable String streamingGroupByFlushThreshold)
throws Exception {
PinotConfiguration config = new PinotConfiguration();
config.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, "localhost");
config.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, Integer.toString(NetUtils.findOpenPort()));
if (streamingGroupByFlushThreshold != null) {
config.setProperty(CommonConstants.Broker.CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD,
streamingGroupByFlushThreshold);
}
BrokerQueryEventListenerFactory.init(config);
BrokerMetrics.register(mock(BrokerMetrics.class));

QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true);
when(queryQuotaManager.acquireApplication(anyString())).thenReturn(true);

return new MultiStageBrokerRequestHandler(config, "testBrokerId", new BrokerRequestIdGenerator(),
mock(RoutingManager.class), new AllowAllAccessControlFactory(), queryQuotaManager,
mock(TableCache.class), mock(MultiStageQueryThrottler.class), mock(FailureDetector.class),
ThreadAccountantUtils.getNoOpAccountant(), null, mock(WorkerManager.class), mock(WorkerManager.class)) {
@Override
public void start() {
// Skip dispatcher.start() and Calcite warmupCompile — neither is needed for this test.
}

@Override
public void shutDown() {
// Match start() — no dispatcher was started, so there is nothing to shut down.
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,16 @@ public static class Broker {
public static final String CONFIG_OF_IGNORE_MISSING_SEGMENTS =
"pinot.broker.query.ignore.missing.segments";
public static final boolean DEFAULT_IGNORE_MISSING_SEGMENTS = false;

/**
Comment thread
yashmayya marked this conversation as resolved.
* Default flush threshold for the streaming group-by leaf-stage operator on MSE. When positive, the broker
* injects this value as the `streamingGroupByFlushThreshold` query option for MSE queries that do not already
* specify it, opting the cluster into the streaming group-by behavior by default. Setting the query option
* explicitly (including to `0` to disable) always wins over the broker default.
*/
public static final String CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD =
"pinot.broker.mse.streaming.group.by.flush.threshold";
public static final int DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD = -1;
// Whether to infer partition hint by default or not.
// This value can always be overridden by INFER_PARTITION_HINT query option
public static final String CONFIG_OF_INFER_PARTITION_HINT = "pinot.broker.multistage.infer.partition.hint";
Expand Down
Loading