Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,19 @@ public class ConfigOptions {
+ "TableLifecycleThrottler scans in-flight drops for "
+ "timeouts.");

public static final ConfigOption<Integer> COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS =
key("coordinator.rebalance.max-inflight-tasks")
.intType()
.defaultValue(1)
.withDescription(
"The maximum number of bucket-level rebalance tasks that can be "
+ "executed concurrently by the coordinator. A higher value "
+ "can speed up rebalance, while a lower value reduces the "
+ "number of simultaneous bucket movements. Setting it to 0 "
+ "pauses scheduling new rebalance tasks; already in-flight "
+ "tasks continue until they complete or time out. The value "
+ "must be non-negative.");

public static final ConfigOption<Boolean> LOG_TABLE_ALLOW_CREATION =
key("allow.create.log.tables")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.fluss.config.ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS;
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
import static org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC;
import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL;
Expand All @@ -66,6 +67,7 @@ class DynamicServerConfig {
new HashSet<>(
Arrays.asList(
DATALAKE_FORMAT.key(),
COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(),
LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(),
KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
KV_SNAPSHOT_INTERVAL.key(),
Expand Down Expand Up @@ -109,6 +111,8 @@ class DynamicServerConfig {

void register(ServerReconfigurable serverReconfigurable) {
serverReconfigures.put(serverReconfigurable.getClass(), serverReconfigurable);
serverReconfigurable.validate(currentConfig);
serverReconfigurable.reconfigure(currentConfig);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@
import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent;
import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
import org.apache.fluss.server.coordinator.event.RebalanceEvent;
import org.apache.fluss.server.coordinator.event.RebalanceMaxInflightTasksChangedEvent;
import org.apache.fluss.server.coordinator.event.RebalanceTaskTimeoutEvent;
import org.apache.fluss.server.coordinator.event.RecoverRebalanceEvent;
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
import org.apache.fluss.server.coordinator.event.ResumeDropEvent;
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
Expand Down Expand Up @@ -261,7 +263,11 @@ public CoordinatorEventProcessor(
this.internalListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
this.rebalanceManager =
new RebalanceManager(
this, zooKeeperClient, coordinatorEventManager, SystemClock.getInstance());
this,
zooKeeperClient,
coordinatorEventManager,
SystemClock.getInstance(),
conf);
this.ioExecutor = ioExecutor;
this.lakeTableHelper =
new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR));
Expand Down Expand Up @@ -663,6 +669,13 @@ public void process(CoordinatorEvent event) {
RebalanceEvent rebalanceEvent = (RebalanceEvent) event;
completeFromCallable(
rebalanceEvent.getRespCallback(), () -> processRebalance(rebalanceEvent));
} else if (event instanceof RecoverRebalanceEvent) {
RecoverRebalanceEvent recoverRebalanceEvent = (RecoverRebalanceEvent) event;
RebalanceTask rebalanceTask = recoverRebalanceEvent.getRebalanceTask();
rebalanceManager.registerRebalance(
rebalanceTask.getRebalanceId(),
rebalanceTask.getExecutePlan(),
rebalanceTask.getRebalanceStatus());
} else if (event instanceof CancelRebalanceEvent) {
CancelRebalanceEvent cancelRebalanceEvent = (CancelRebalanceEvent) event;
completeFromCallable(
Expand All @@ -675,6 +688,11 @@ public void process(CoordinatorEvent event) {
timeoutEvent.getTableBucket());
rebalanceManager.finishRebalanceTask(
timeoutEvent.getTableBucket(), RebalanceStatus.TIMEOUT);
} else if (event instanceof RebalanceMaxInflightTasksChangedEvent) {
RebalanceMaxInflightTasksChangedEvent configChangedEvent =
(RebalanceMaxInflightTasksChangedEvent) event;
rebalanceManager.updateMaxInflightRebalanceTasks(
configChangedEvent.getMaxInflightTasks());
} else if (event instanceof ResumeDropEvent) {
// Resume-mode reconciliation queued by TableLifecycleThrottler: dispatch on the event
// thread so the @NotThreadSafe CoordinatorContext / state machines are only mutated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ protected void initCoordinatorLeader() throws Exception {
metadataManager,
kvSnapshotLeaseManager,
clock);
dynamicConfigManager.register(coordinatorEventProcessor.getRebalanceManager());
coordinatorEventProcessor.startup();

createDefaultDatabase();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.coordinator.event;

/** An event fired when the rebalance max in-flight task limit changes dynamically. */
public class RebalanceMaxInflightTasksChangedEvent implements CoordinatorEvent {

private final int maxInflightTasks;

public RebalanceMaxInflightTasksChangedEvent(int maxInflightTasks) {
this.maxInflightTasks = maxInflightTasks;
}

public int getMaxInflightTasks() {
return maxInflightTasks;
}

@Override
public String toString() {
return "RebalanceMaxInflightTasksChangedEvent{maxInflightTasks=" + maxInflightTasks + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.coordinator.event;

import org.apache.fluss.server.zk.data.RebalanceTask;

/** An event fired when a rebalance task should be recovered from ZooKeeper. */
public class RecoverRebalanceEvent implements CoordinatorEvent {

private final RebalanceTask rebalanceTask;

public RecoverRebalanceEvent(RebalanceTask rebalanceTask) {
this.rebalanceTask = rebalanceTask;
}

public RebalanceTask getRebalanceTask() {
return rebalanceTask;
}

@Override
public String toString() {
return "RecoverRebalanceEvent{rebalanceTask=" + rebalanceTask + "}";
}
}
Loading
Loading