Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion be/src/runtime/workload_management/workload_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ bool WorkloadConditionScanBytes::eval(std::string str_val) {
return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, _scan_bytes);
}

// remote scan bytes
WorkloadConditionScanBytesFromRemoteStorage::WorkloadConditionScanBytesFromRemoteStorage(
WorkloadCompareOperator op, std::string str_val) {
_op = op;
_scan_bytes_from_remote_storage = std::stol(str_val);
}

bool WorkloadConditionScanBytesFromRemoteStorage::eval(std::string str_val) {
int64_t scan_bytes_from_remote_storage_args = std::stol(str_val);
return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_from_remote_storage_args,
_scan_bytes_from_remote_storage);
}

// query memory
WorkloadConditionQueryMemory::WorkloadConditionQueryMemory(WorkloadCompareOperator op,
std::string str_val) {
Expand All @@ -80,4 +93,4 @@ bool WorkloadConditionUsername::eval(std::string str_val) {
return WorkloadCompareUtils::compare_string(_op, str_val, _username);
}

} // namespace doris
} // namespace doris
33 changes: 31 additions & 2 deletions be/src/runtime/workload_management/workload_condition.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@

namespace doris {

enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, QUERY_MEMORY_BYTES, USERNAME };
enum WorkloadMetricType {
QUERY_TIME,
SCAN_ROWS,
SCAN_BYTES,
// Extend the BE workload metric enum with remote scan bytes support.
SCAN_BYTES_FROM_REMOTE_STORAGE,
QUERY_MEMORY_BYTES,
USERNAME
};

class WorkloadCondition {
public:
Expand Down Expand Up @@ -90,6 +98,25 @@ class WorkloadConditionScanBytes : public WorkloadCondition {
WorkloadCompareOperator _op;
};

class WorkloadConditionScanBytesFromRemoteStorage : public WorkloadCondition {
public:
WorkloadConditionScanBytesFromRemoteStorage(WorkloadCompareOperator op, std::string str_val);
bool eval(std::string str_val) override;
WorkloadMetricType get_workload_metric_type() override {
return WorkloadMetricType::SCAN_BYTES_FROM_REMOTE_STORAGE;
}

std::string get_metric_string() override { return "scan_bytes_from_remote_storage"; }

std::string get_metric_value_string() override {
return std::to_string(_scan_bytes_from_remote_storage);
}

private:
int64_t _scan_bytes_from_remote_storage;
WorkloadCompareOperator _op;
};

class WorkloadConditionQueryMemory : public WorkloadCondition {
public:
WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string str_val);
Expand Down Expand Up @@ -136,6 +163,8 @@ class WorkloadConditionFactory {
return std::make_unique<WorkloadConditionScanRows>(op, str_val);
} else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) {
return std::make_unique<WorkloadConditionScanBytes>(op, str_val);
} else if (TWorkloadMetricType::type::BE_SCAN_BYTES_FROM_REMOTE_STORAGE == metric_name) {
return std::make_unique<WorkloadConditionScanBytesFromRemoteStorage>(op, str_val);
} else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES == metric_name) {
return std::make_unique<WorkloadConditionQueryMemory>(op, str_val);
} else if (TWorkloadMetricType::type::USERNAME == metric_name) {
Expand All @@ -146,4 +175,4 @@ class WorkloadConditionFactory {
}
};

} // namespace doris
} // namespace doris
6 changes: 6 additions & 0 deletions be/src/runtime/workload_management/workload_sched_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ bool WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext* action_runtim
val = std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_bytes());
break;
}
// Evaluate the remote read breaker against the existing IO context remote scan counter.
case WorkloadMetricType::SCAN_BYTES_FROM_REMOTE_STORAGE: {
val = std::to_string(action_runtime_ctx->resource_ctx->io_context()
->scan_bytes_from_remote_storage());
break;
}
case WorkloadMetricType::SCAN_ROWS: {
val = std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_rows());
break;
Expand Down
34 changes: 33 additions & 1 deletion be/test/runtime/workload_sched_policy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,39 @@ TEST_F(WorkloadSchedPolicyTest, one_policy_one_condition) {
<< ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes();
}

// 5 check query be memory bytes
// 5 check remote scan bytes
{
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
cond_ptr_list.push_back(create_workload_condition(
TWorkloadMetricType::type::BE_SCAN_BYTES_FROM_REMOTE_STORAGE,
TCompareOperator::type::GREATER, "1000"));
std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY));
std::set<int64_t> wg_id_set;
policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list),
std::move(action_ptr_list));

// Updating total scan bytes alone must not satisfy the remote read condition.
WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context();
action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(1001);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
<< ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes();

// Updating remote scan bytes below the threshold must still miss.
action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes_from_remote_storage(999);
EXPECT_FALSE(policy->is_match(&action_runtime_ctx))
<< ": "
<< action_runtime_ctx.resource_ctx->io_context()->scan_bytes_from_remote_storage();

// Only the remote scan bytes counter should drive this metric to a match.
action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes_from_remote_storage(2);
EXPECT_TRUE(policy->is_match(&action_runtime_ctx))
<< ": "
<< action_runtime_ctx.resource_ctx->io_context()->scan_bytes_from_remote_storage();
}

// 6 check query be memory bytes
{
std::shared_ptr<WorkloadSchedPolicy> policy = std::make_shared<WorkloadSchedPolicy>();
std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ static WorkloadCondition createWorkloadCondition(WorkloadConditionMeta cm)
return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, cm.value);
} else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) {
return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, cm.value);
// Register the remote scan bytes condition so FE can parse and persist the new metric.
} else if (WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE.equals(cm.metricName)) {
return WorkloadConditionBeScanBytesFromRemoteStorage.createWorkloadCondition(cm.op, cm.value);
} else if (WorkloadMetricType.QUERY_BE_MEMORY_BYTES.equals(cm.metricName)) {
return WorkloadConditionQueryBeMemory.createWorkloadCondition(cm.op, cm.value);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.resource.workloadschedpolicy;

import org.apache.doris.common.UserException;

/**
* Workload condition for matching query remote scan bytes collected on BE.
*/
public class WorkloadConditionBeScanBytesFromRemoteStorage implements WorkloadCondition {

private long value;

private WorkloadConditionOperator op;

public WorkloadConditionBeScanBytesFromRemoteStorage(WorkloadConditionOperator op, long value) {
this.op = op;
this.value = value;
}

@Override
public boolean eval(String strValue) {
// Currently this metric is evaluated only on BE, so FE-side matching always returns false.
return false;
}

public static WorkloadConditionBeScanBytesFromRemoteStorage createWorkloadCondition(
WorkloadConditionOperator op, String value) throws UserException {
long longValue = -1;
try {
longValue = Long.parseLong(value);
if (longValue < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new UserException("invalid remote scan bytes value: " + value + ", it requires >= 0");
}
return new WorkloadConditionBeScanBytesFromRemoteStorage(op, longValue);
}

@Override
public WorkloadMetricType getMetricType() {
return WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
package org.apache.doris.resource.workloadschedpolicy;

public enum WorkloadMetricType {
USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, QUERY_BE_MEMORY_BYTES
// Keep the metric enum aligned with the workload policy metrics published to BE.
USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, BE_SCAN_BYTES_FROM_REMOTE_STORAGE, QUERY_BE_MEMORY_BYTES
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ public WorkloadSchedPolicyMgr() {

public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET
= new ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.BE_SCAN_ROWS)
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME)
.add(WorkloadMetricType.BE_SCAN_BYTES)
// Treat remote scan bytes as a BE-only runtime metric.
.add(WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE).add(WorkloadMetricType.QUERY_TIME)
.add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).add(WorkloadMetricType.USERNAME).build();

// used for convert fe type to thrift type
Expand All @@ -114,6 +116,9 @@ public WorkloadSchedPolicyMgr() {
.put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
.put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS)
.put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES)
// Map the new FE metric enum to the appended thrift metric enum.
.put(WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE,
TWorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE)
.put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, TWorkloadMetricType.QUERY_BE_MEMORY_BYTES)
.put(WorkloadMetricType.USERNAME, TWorkloadMetricType.USERNAME).build();
public static final ImmutableMap<WorkloadActionType, TWorkloadActionType> ACTION_MAP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.persist.EditLog;
import org.apache.doris.thrift.TWorkloadMetricType;

import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -343,4 +344,49 @@ public void testNonCloudModeRejectsLeadingDotInWorkloadGroup() {
e.getMessage().contains("non-cloud mode"));
}
}

@Test
public void testRemoteScanBytesMetricCanCreateBePolicy() throws UserException {
List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
// Verify the new metric string can be parsed into a BE-side workload condition.
conditionMetas.add(new WorkloadConditionMeta("be_scan_bytes_from_remote_storage", ">", "100"));
List<WorkloadActionMeta> actionMetas = new ArrayList<>();
actionMetas.add(new WorkloadActionMeta("cancel_query", ""));

mgr.createWorkloadSchedPolicy("policy_remote_scan_bytes", false, conditionMetas, actionMetas, null);

Assert.assertTrue(WorkloadSchedPolicyMgr.BE_METRIC_SET.contains(
WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE));
Assert.assertEquals(TWorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE,
WorkloadSchedPolicyMgr.METRIC_MAP.get(WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE));
}

@Test
public void testRemoteScanBytesMetricRejectsNegativeValue() throws UserException {
try {
// Reject negative thresholds for the remote scan bytes breaker.
WorkloadCondition.createWorkloadCondition(
new WorkloadConditionMeta("be_scan_bytes_from_remote_storage", ">", "-1"));
Assert.fail("Should throw exception for negative remote scan bytes value");
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("remote scan bytes"));
}
}

@Test
public void testRemoteScanBytesMetricCanNotMixWithFeAction() throws UserException {
try {
List<WorkloadConditionMeta> conditionMetas = new ArrayList<>();
// Validate the new metric follows the existing BE-only action compatibility rules.
conditionMetas.add(new WorkloadConditionMeta("be_scan_bytes_from_remote_storage", ">", "100"));
List<WorkloadActionMeta> actionMetas = new ArrayList<>();
actionMetas.add(new WorkloadActionMeta("set_session_variable", "workload_group=normal"));

mgr.createWorkloadSchedPolicy("policy_remote_scan_bytes_with_fe_action", false, conditionMetas,
actionMetas, null);
Assert.fail("Should throw exception for remote scan bytes metric with FE action");
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("action and metric must run in FE together or run in BE together"));
}
}
}
4 changes: 3 additions & 1 deletion gensrc/thrift/BackendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ enum TWorkloadMetricType {
BE_SCAN_ROWS = 1,
BE_SCAN_BYTES = 2,
QUERY_BE_MEMORY_BYTES = 3,
USERNAME = 4
USERNAME = 4,
// Append the new enum value to keep existing metric ids stable across versions.
BE_SCAN_BYTES_FROM_REMOTE_STORAGE = 5
}

enum TCompareOperator {
Expand Down
Loading
Loading