diff --git a/be/src/runtime/workload_management/workload_condition.cpp b/be/src/runtime/workload_management/workload_condition.cpp index 4c85a95ad1fd22..618168df69056f 100644 --- a/be/src/runtime/workload_management/workload_condition.cpp +++ b/be/src/runtime/workload_management/workload_condition.cpp @@ -56,6 +56,19 @@ bool WorkloadConditionScanBytes::eval(std::string str_val) { return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, _scan_bytes); } +// remote scan bytes +WorkloadConditionScanBytesFromRemoteStorage::WorkloadConditionScanBytesFromRemoteStorage( + WorkloadCompareOperator op, std::string str_val) { + _op = op; + _scan_bytes_from_remote_storage = std::stol(str_val); +} + +bool WorkloadConditionScanBytesFromRemoteStorage::eval(std::string str_val) { + int64_t scan_bytes_from_remote_storage_args = std::stol(str_val); + return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_from_remote_storage_args, + _scan_bytes_from_remote_storage); +} + // query memory WorkloadConditionQueryMemory::WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string str_val) { @@ -80,4 +93,4 @@ bool WorkloadConditionUsername::eval(std::string str_val) { return WorkloadCompareUtils::compare_string(_op, str_val, _username); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/runtime/workload_management/workload_condition.h b/be/src/runtime/workload_management/workload_condition.h index 1a8c9e8dc8b6bd..15ecc8d4b25471 100644 --- a/be/src/runtime/workload_management/workload_condition.h +++ b/be/src/runtime/workload_management/workload_condition.h @@ -23,7 +23,15 @@ namespace doris { -enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, QUERY_MEMORY_BYTES, USERNAME }; +enum WorkloadMetricType { + QUERY_TIME, + SCAN_ROWS, + SCAN_BYTES, + // Extend the BE workload metric enum with remote scan bytes support. + SCAN_BYTES_FROM_REMOTE_STORAGE, + QUERY_MEMORY_BYTES, + USERNAME +}; class WorkloadCondition { public: @@ -90,6 +98,25 @@ class WorkloadConditionScanBytes : public WorkloadCondition { WorkloadCompareOperator _op; }; +class WorkloadConditionScanBytesFromRemoteStorage : public WorkloadCondition { +public: + WorkloadConditionScanBytesFromRemoteStorage(WorkloadCompareOperator op, std::string str_val); + bool eval(std::string str_val) override; + WorkloadMetricType get_workload_metric_type() override { + return WorkloadMetricType::SCAN_BYTES_FROM_REMOTE_STORAGE; + } + + std::string get_metric_string() override { return "scan_bytes_from_remote_storage"; } + + std::string get_metric_value_string() override { + return std::to_string(_scan_bytes_from_remote_storage); + } + +private: + int64_t _scan_bytes_from_remote_storage; + WorkloadCompareOperator _op; +}; + class WorkloadConditionQueryMemory : public WorkloadCondition { public: WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string str_val); @@ -136,6 +163,8 @@ class WorkloadConditionFactory { return std::make_unique(op, str_val); } else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) { return std::make_unique(op, str_val); + } else if (TWorkloadMetricType::type::BE_SCAN_BYTES_FROM_REMOTE_STORAGE == metric_name) { + return std::make_unique(op, str_val); } else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES == metric_name) { return std::make_unique(op, str_val); } else if (TWorkloadMetricType::type::USERNAME == metric_name) { @@ -146,4 +175,4 @@ class WorkloadConditionFactory { } }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp b/be/src/runtime/workload_management/workload_sched_policy.cpp index e439c873ad2e3f..a3cbc003ac08a5 100644 --- a/be/src/runtime/workload_management/workload_sched_policy.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy.cpp @@ -80,6 +80,12 @@ bool WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext* action_runtim val = std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_bytes()); break; } + // Evaluate the remote read breaker against the existing IO context remote scan counter. + case WorkloadMetricType::SCAN_BYTES_FROM_REMOTE_STORAGE: { + val = std::to_string(action_runtime_ctx->resource_ctx->io_context() + ->scan_bytes_from_remote_storage()); + break; + } case WorkloadMetricType::SCAN_ROWS: { val = std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_rows()); break; diff --git a/be/test/runtime/workload_sched_policy_test.cpp b/be/test/runtime/workload_sched_policy_test.cpp index 719ce0f04ad9f1..311bf982685437 100644 --- a/be/test/runtime/workload_sched_policy_test.cpp +++ b/be/test/runtime/workload_sched_policy_test.cpp @@ -198,7 +198,39 @@ TEST_F(WorkloadSchedPolicyTest, one_policy_one_condition) { << ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes(); } - // 5 check query be memory bytes + // 5 check remote scan bytes + { + std::shared_ptr policy = std::make_shared(); + std::vector> cond_ptr_list; + cond_ptr_list.push_back(create_workload_condition( + TWorkloadMetricType::type::BE_SCAN_BYTES_FROM_REMOTE_STORAGE, + TCompareOperator::type::GREATER, "1000")); + std::vector> action_ptr_list; + action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY)); + std::set wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + // Updating total scan bytes alone must not satisfy the remote read condition. + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(1001); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)) + << ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes(); + + // Updating remote scan bytes below the threshold must still miss. + action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes_from_remote_storage(999); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)) + << ": " + << action_runtime_ctx.resource_ctx->io_context()->scan_bytes_from_remote_storage(); + + // Only the remote scan bytes counter should drive this metric to a match. + action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes_from_remote_storage(2); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)) + << ": " + << action_runtime_ctx.resource_ctx->io_context()->scan_bytes_from_remote_storage(); + } + + // 6 check query be memory bytes { std::shared_ptr policy = std::make_shared(); std::vector> cond_ptr_list; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java index c790a4013080d5..01dfae955dcb18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java @@ -37,6 +37,9 @@ static WorkloadCondition createWorkloadCondition(WorkloadConditionMeta cm) return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, cm.value); } else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) { return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, cm.value); + // Register the remote scan bytes condition so FE can parse and persist the new metric. + } else if (WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE.equals(cm.metricName)) { + return WorkloadConditionBeScanBytesFromRemoteStorage.createWorkloadCondition(cm.op, cm.value); } else if (WorkloadMetricType.QUERY_BE_MEMORY_BYTES.equals(cm.metricName)) { return WorkloadConditionQueryBeMemory.createWorkloadCondition(cm.op, cm.value); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytesFromRemoteStorage.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytesFromRemoteStorage.java new file mode 100644 index 00000000000000..3b4e004971ebed --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytesFromRemoteStorage.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +/** + * Workload condition for matching query remote scan bytes collected on BE. + */ +public class WorkloadConditionBeScanBytesFromRemoteStorage implements WorkloadCondition { + + private long value; + + private WorkloadConditionOperator op; + + public WorkloadConditionBeScanBytesFromRemoteStorage(WorkloadConditionOperator op, long value) { + this.op = op; + this.value = value; + } + + @Override + public boolean eval(String strValue) { + // Currently this metric is evaluated only on BE, so FE-side matching always returns false. + return false; + } + + public static WorkloadConditionBeScanBytesFromRemoteStorage createWorkloadCondition( + WorkloadConditionOperator op, String value) throws UserException { + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid remote scan bytes value: " + value + ", it requires >= 0"); + } + return new WorkloadConditionBeScanBytesFromRemoteStorage(op, longValue); + } + + @Override + public WorkloadMetricType getMetricType() { + return WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java index 93e612a85c2ddd..5d83cb845f9491 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java @@ -18,5 +18,6 @@ package org.apache.doris.resource.workloadschedpolicy; public enum WorkloadMetricType { - USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, QUERY_BE_MEMORY_BYTES + // Keep the metric enum aligned with the workload policy metrics published to BE. + USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, BE_SCAN_BYTES_FROM_REMOTE_STORAGE, QUERY_BE_MEMORY_BYTES } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 3442c8a7789448..07d4288b6431f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -105,7 +105,9 @@ public WorkloadSchedPolicyMgr() { public static final ImmutableSet BE_METRIC_SET = new ImmutableSet.Builder().add(WorkloadMetricType.BE_SCAN_ROWS) - .add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME) + .add(WorkloadMetricType.BE_SCAN_BYTES) + // Treat remote scan bytes as a BE-only runtime metric. + .add(WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE).add(WorkloadMetricType.QUERY_TIME) .add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).add(WorkloadMetricType.USERNAME).build(); // used for convert fe type to thrift type @@ -114,6 +116,9 @@ public WorkloadSchedPolicyMgr() { .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME) .put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS) .put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES) + // Map the new FE metric enum to the appended thrift metric enum. + .put(WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE, + TWorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE) .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, TWorkloadMetricType.QUERY_BE_MEMORY_BYTES) .put(WorkloadMetricType.USERNAME, TWorkloadMetricType.USERNAME).build(); public static final ImmutableMap ACTION_MAP diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java index 7461cba03d5a71..6d4b6e2e6c3a7e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java @@ -21,6 +21,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.persist.EditLog; +import org.apache.doris.thrift.TWorkloadMetricType; import org.junit.After; import org.junit.Assert; @@ -343,4 +344,49 @@ public void testNonCloudModeRejectsLeadingDotInWorkloadGroup() { e.getMessage().contains("non-cloud mode")); } } + + @Test + public void testRemoteScanBytesMetricCanCreateBePolicy() throws UserException { + List conditionMetas = new ArrayList<>(); + // Verify the new metric string can be parsed into a BE-side workload condition. + conditionMetas.add(new WorkloadConditionMeta("be_scan_bytes_from_remote_storage", ">", "100")); + List actionMetas = new ArrayList<>(); + actionMetas.add(new WorkloadActionMeta("cancel_query", "")); + + mgr.createWorkloadSchedPolicy("policy_remote_scan_bytes", false, conditionMetas, actionMetas, null); + + Assert.assertTrue(WorkloadSchedPolicyMgr.BE_METRIC_SET.contains( + WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE)); + Assert.assertEquals(TWorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE, + WorkloadSchedPolicyMgr.METRIC_MAP.get(WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE)); + } + + @Test + public void testRemoteScanBytesMetricRejectsNegativeValue() throws UserException { + try { + // Reject negative thresholds for the remote scan bytes breaker. + WorkloadCondition.createWorkloadCondition( + new WorkloadConditionMeta("be_scan_bytes_from_remote_storage", ">", "-1")); + Assert.fail("Should throw exception for negative remote scan bytes value"); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("remote scan bytes")); + } + } + + @Test + public void testRemoteScanBytesMetricCanNotMixWithFeAction() throws UserException { + try { + List conditionMetas = new ArrayList<>(); + // Validate the new metric follows the existing BE-only action compatibility rules. + conditionMetas.add(new WorkloadConditionMeta("be_scan_bytes_from_remote_storage", ">", "100")); + List actionMetas = new ArrayList<>(); + actionMetas.add(new WorkloadActionMeta("set_session_variable", "workload_group=normal")); + + mgr.createWorkloadSchedPolicy("policy_remote_scan_bytes_with_fe_action", false, conditionMetas, + actionMetas, null); + Assert.fail("Should throw exception for remote scan bytes metric with FE action"); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("action and metric must run in FE together or run in BE together")); + } + } } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index d6cd95083c9722..3867cec245f12d 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -312,7 +312,9 @@ enum TWorkloadMetricType { BE_SCAN_ROWS = 1, BE_SCAN_BYTES = 2, QUERY_BE_MEMORY_BYTES = 3, - USERNAME = 4 + USERNAME = 4, + // Append the new enum value to keep existing metric ids stable across versions. + BE_SCAN_BYTES_FROM_REMOTE_STORAGE = 5 } enum TCompareOperator { diff --git a/regression-test/suites/external_table_p0/hive/test_workload_policy_remote_scan_bytes.groovy b/regression-test/suites/external_table_p0/hive/test_workload_policy_remote_scan_bytes.groovy new file mode 100644 index 00000000000000..3b6cb46f1b3bba --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_workload_policy_remote_scan_bytes.groovy @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_workload_policy_remote_scan_bytes", "p0,external") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable Hive test.") + return + } + + String hivePrefix = "hive2" + setHivePrefix(hivePrefix) + + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String hmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String catalogName = "test_workload_policy_remote_scan_bytes" + String workloadGroupName = "test_remote_scan_bytes_wg" + String policyName = "test_remote_scan_bytes_policy" + String invalidPolicyName = "test_remote_scan_bytes_invalid" + + String forComputeGroupStr = "" + String currentCgName = "" + if (isCloudMode()) { + def clusters = sql "SHOW CLUSTERS" + assertTrue(!clusters.isEmpty()) + String validCluster = clusters[0][0] + currentCgName = "${validCluster}." + forComputeGroupStr = " for ${validCluster} " + } + + try { + sql """DROP WORKLOAD POLICY IF EXISTS ${policyName}""" + sql """DROP WORKLOAD POLICY IF EXISTS ${invalidPolicyName}""" + sql """DROP WORKLOAD GROUP IF EXISTS ${workloadGroupName} ${forComputeGroupStr}""" + sql """DROP CATALOG IF EXISTS ${catalogName}""" + + sql """ + CREATE CATALOG ${catalogName} PROPERTIES ( + 'type' = 'hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}', + 'hadoop.username' = 'hive', + 'ipc.client.fallback-to-simple-auth-allowed' = 'true' + ) + """ + + String lineitemDb = "tpch1_parquet" + try { + def tables = sql """SHOW TABLES FROM ${catalogName}.${lineitemDb} LIKE 'lineitem'""" + if (tables.isEmpty()) { + throw new IllegalStateException("${lineitemDb}.lineitem does not exist") + } + } catch (Throwable ignored) { + lineitemDb = "tpch1" + def tables = sql """SHOW TABLES FROM ${catalogName}.${lineitemDb} LIKE 'lineitem'""" + assertFalse(tables.isEmpty(), "${catalogName} does not contain tpch1_parquet.lineitem or tpch1.lineitem") + } + + sql """ + CREATE WORKLOAD GROUP ${workloadGroupName} ${forComputeGroupStr} + PROPERTIES ('max_cpu_percent' = '100') + """ + + test { + sql """ + CREATE WORKLOAD POLICY ${invalidPolicyName} + CONDITIONS(be_scan_bytes_from_remote_storage > -1) + ACTIONS(cancel_query) + PROPERTIES('enabled' = 'false') + """ + exception "invalid remote scan bytes value" + } + + sql """ + CREATE WORKLOAD POLICY ${policyName} + CONDITIONS(be_scan_bytes_from_remote_storage > 1) + ACTIONS(cancel_query) + PROPERTIES( + 'priority' = '100', + 'workload_group' = '${currentCgName}${workloadGroupName}' + ) + """ + + def policy = sql """ + SELECT name, condition, action, priority, enabled, workload_group + FROM information_schema.workload_policy + WHERE name = '${policyName}' + """ + assertEquals(1, policy.size()) + assertEquals(policyName, policy[0][0]) + assertTrue(policy[0][1].toString().contains("be_scan_bytes_from_remote_storage > 1")) + + // Wait for FE to publish the new BE-side policy before issuing the query. + Thread.sleep(15000) + + Throwable queryException = null + sql """SET workload_group = '${workloadGroupName}'""" + sql """SET enable_file_cache = false""" + sql """SET enable_sql_cache = false""" + try { + sql """ + SELECT SUM(SLEEP(1) + l_quantity) + FROM ( + SELECT l_quantity + FROM ${catalogName}.${lineitemDb}.lineitem + LIMIT 10 + ) s + """ + } catch (Throwable t) { + queryException = t + } + assertTrue(queryException != null, "query should be cancelled by remote scan bytes workload policy") + String msg = queryException.getMessage() + logger.info("Remote scan bytes workload policy cancel message: " + msg) + assertTrue(msg != null && msg.contains("cancelled by workload policy: ${policyName}"), + "unexpected cancel policy: " + msg) + assertTrue(msg.contains("scan_bytes_from_remote_storage"), + "remote scan bytes counter is missing from cancel message: " + msg) + } finally { + try { + sql """SET workload_group = ''""" + } catch (Throwable t) { + logger.info("ignore reset workload_group failure: " + t.getMessage()) + } + sql """DROP WORKLOAD POLICY IF EXISTS ${policyName}""" + sql """DROP WORKLOAD POLICY IF EXISTS ${invalidPolicyName}""" + sql """DROP WORKLOAD GROUP IF EXISTS ${workloadGroupName} ${forComputeGroupStr}""" + sql """DROP CATALOG IF EXISTS ${catalogName}""" + } +}