diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 0301d3d47d..e16a7ac5df 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -221,6 +221,19 @@ public class ConfigOptions { + "TableLifecycleThrottler scans in-flight drops for " + "timeouts."); + public static final ConfigOption COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS = + key("coordinator.rebalance.max-inflight-tasks") + .intType() + .defaultValue(1) + .withDescription( + "The maximum number of bucket-level rebalance tasks that can be " + + "executed concurrently by the coordinator. A higher value " + + "can speed up rebalance, while a lower value reduces the " + + "number of simultaneous bucket movements. Setting it to 0 " + + "pauses scheduling new rebalance tasks; already in-flight " + + "tasks continue until they complete or time out. The value " + + "must be non-negative."); + public static final ConfigOption LOG_TABLE_ALLOW_CREATION = key("allow.create.log.tables") .booleanType() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java index dee0b1fade..78f4f58dab 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java @@ -41,6 +41,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.fluss.config.ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS; import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; import static org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC; import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL; @@ -66,6 +67,7 @@ class DynamicServerConfig { new HashSet<>( Arrays.asList( DATALAKE_FORMAT.key(), + COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(), LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(), KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), KV_SNAPSHOT_INTERVAL.key(), @@ -109,6 +111,8 @@ class DynamicServerConfig { void register(ServerReconfigurable serverReconfigurable) { serverReconfigures.put(serverReconfigurable.getClass(), serverReconfigurable); + serverReconfigurable.validate(currentConfig); + serverReconfigurable.reconfigure(currentConfig); } /** diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 537e2df63f..ab4fdee895 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -85,7 +85,9 @@ import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent; import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent; import org.apache.fluss.server.coordinator.event.RebalanceEvent; +import org.apache.fluss.server.coordinator.event.RebalanceMaxInflightTasksChangedEvent; import org.apache.fluss.server.coordinator.event.RebalanceTaskTimeoutEvent; +import org.apache.fluss.server.coordinator.event.RecoverRebalanceEvent; import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; import org.apache.fluss.server.coordinator.event.ResumeDropEvent; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; @@ -261,7 +263,11 @@ public CoordinatorEventProcessor( this.internalListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME); this.rebalanceManager = new RebalanceManager( - this, zooKeeperClient, coordinatorEventManager, SystemClock.getInstance()); + this, + zooKeeperClient, + coordinatorEventManager, + SystemClock.getInstance(), + conf); this.ioExecutor = ioExecutor; this.lakeTableHelper = new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR)); @@ -663,6 +669,13 @@ public void process(CoordinatorEvent event) { RebalanceEvent rebalanceEvent = (RebalanceEvent) event; completeFromCallable( rebalanceEvent.getRespCallback(), () -> processRebalance(rebalanceEvent)); + } else if (event instanceof RecoverRebalanceEvent) { + RecoverRebalanceEvent recoverRebalanceEvent = (RecoverRebalanceEvent) event; + RebalanceTask rebalanceTask = recoverRebalanceEvent.getRebalanceTask(); + rebalanceManager.registerRebalance( + rebalanceTask.getRebalanceId(), + rebalanceTask.getExecutePlan(), + rebalanceTask.getRebalanceStatus()); } else if (event instanceof CancelRebalanceEvent) { CancelRebalanceEvent cancelRebalanceEvent = (CancelRebalanceEvent) event; completeFromCallable( @@ -675,6 +688,11 @@ public void process(CoordinatorEvent event) { timeoutEvent.getTableBucket()); rebalanceManager.finishRebalanceTask( timeoutEvent.getTableBucket(), RebalanceStatus.TIMEOUT); + } else if (event instanceof RebalanceMaxInflightTasksChangedEvent) { + RebalanceMaxInflightTasksChangedEvent configChangedEvent = + (RebalanceMaxInflightTasksChangedEvent) event; + rebalanceManager.updateMaxInflightRebalanceTasks( + configChangedEvent.getMaxInflightTasks()); } else if (event instanceof ResumeDropEvent) { // Resume-mode reconciliation queued by TableLifecycleThrottler: dispatch on the event // thread so the @NotThreadSafe CoordinatorContext / state machines are only mutated diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 2c4fcf5e45..665e4f2ff4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -338,6 +338,7 @@ protected void initCoordinatorLeader() throws Exception { metadataManager, kvSnapshotLeaseManager, clock); + dynamicConfigManager.register(coordinatorEventProcessor.getRebalanceManager()); coordinatorEventProcessor.startup(); createDefaultDatabase(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RebalanceMaxInflightTasksChangedEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RebalanceMaxInflightTasksChangedEvent.java new file mode 100644 index 0000000000..1be92bfb08 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RebalanceMaxInflightTasksChangedEvent.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +/** An event fired when the rebalance max in-flight task limit changes dynamically. */ +public class RebalanceMaxInflightTasksChangedEvent implements CoordinatorEvent { + + private final int maxInflightTasks; + + public RebalanceMaxInflightTasksChangedEvent(int maxInflightTasks) { + this.maxInflightTasks = maxInflightTasks; + } + + public int getMaxInflightTasks() { + return maxInflightTasks; + } + + @Override + public String toString() { + return "RebalanceMaxInflightTasksChangedEvent{maxInflightTasks=" + maxInflightTasks + "}"; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RecoverRebalanceEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RecoverRebalanceEvent.java new file mode 100644 index 0000000000..72679846dd --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RecoverRebalanceEvent.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import org.apache.fluss.server.zk.data.RebalanceTask; + +/** An event fired when a rebalance task should be recovered from ZooKeeper. */ +public class RecoverRebalanceEvent implements CoordinatorEvent { + + private final RebalanceTask rebalanceTask; + + public RecoverRebalanceEvent(RebalanceTask rebalanceTask) { + this.rebalanceTask = rebalanceTask; + } + + public RebalanceTask getRebalanceTask() { + return rebalanceTask; + } + + @Override + public String toString() { + return "RecoverRebalanceEvent{rebalanceTask=" + rebalanceTask + "}"; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java index 9f2f350bdb..10ae4aa2f4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java @@ -23,12 +23,18 @@ import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; import org.apache.fluss.cluster.rebalance.RebalanceStatus; import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.cluster.ServerReconfigurable; +import org.apache.fluss.exception.ConfigException; import org.apache.fluss.exception.NoRebalanceInProgressException; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.server.coordinator.CoordinatorContext; import org.apache.fluss.server.coordinator.CoordinatorEventProcessor; import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.coordinator.event.RebalanceMaxInflightTasksChangedEvent; import org.apache.fluss.server.coordinator.event.RebalanceTaskTimeoutEvent; +import org.apache.fluss.server.coordinator.event.RecoverRebalanceEvent; import org.apache.fluss.server.coordinator.rebalance.goal.Goal; import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer; import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; @@ -68,14 +74,16 @@ import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED; import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING; import static org.apache.fluss.utils.Preconditions.checkArgument; -import static org.apache.fluss.utils.Preconditions.checkNotNull; /** * A rebalance manager to generate rebalance plan, and execution rebalance plan. * - *

This manager can only be used in {@link CoordinatorEventProcessor} as a single threaded model. + *

This manager is used in {@link CoordinatorEventProcessor}'s single-threaded event model. + * Non-event threads, such as startup recovery, dynamic configuration callbacks, and the timeout + * checker, must enqueue coordinator events instead of directly submitting tasks or advancing + * rebalance state. */ -public class RebalanceManager { +public class RebalanceManager implements ServerReconfigurable { private static final Logger LOG = LoggerFactory.getLogger(RebalanceManager.class); /** Hardcoded timeout for an in-flight rebalance task: 2 minutes. */ @@ -90,47 +98,48 @@ public class RebalanceManager { private final Clock clock; private final ScheduledExecutorService timeoutChecker; - /** A queue of in progress table bucket to rebalance. */ - private final Queue inProgressRebalanceTasksQueue = new ArrayDeque<>(); + /** A queue of pending table buckets to rebalance. */ + private final Queue pendingRebalanceTasksQueue = new ArrayDeque<>(); /** A mapping from table bucket to rebalance status of pending and running tasks. */ private final Map inProgressRebalanceTasks = new ConcurrentHashMap<>(); + /** A mapping from running table bucket to the time when the task was started. */ + private final Map inflightRebalanceTaskStartMs = new ConcurrentHashMap<>(); + /** A mapping from table bucket to rebalance status of failed or completed tasks. */ private final Map finishedRebalanceTasks = new ConcurrentHashMap<>(); private final GoalOptimizer goalOptimizer; + private volatile int maxInflightRebalanceTasks; + private volatile int queuedMaxInflightRebalanceTasks; private volatile long registerTime; private volatile @Nullable RebalanceStatus rebalanceStatus; private volatile @Nullable String currentRebalanceId; private volatile boolean isClosed = false; - /** - * Timestamp when the current in-flight task was started, or -1 if idle. - * - *

Write ordering contract (volatile publication idiom): always write {@code - * inflightTaskStartMs} BEFORE {@code inflightTaskBucket} when setting, and clear {@code - * inflightTaskBucket} BEFORE {@code inflightTaskStartMs} when resetting. The timeout checker - * reads in reverse order (bucket first, then startMs), ensuring it never observes a stale - * startMs paired with a new bucket. - */ - private volatile long inflightTaskStartMs = -1; - - /** The bucket of the current in-flight task, or null if idle. Acts as the "gate" variable. */ - private volatile @Nullable TableBucket inflightTaskBucket; - public RebalanceManager( CoordinatorEventProcessor eventProcessor, ZooKeeperClient zkClient, EventManager eventManager, Clock clock) { + this(eventProcessor, zkClient, eventManager, clock, new Configuration()); + } + + public RebalanceManager( + CoordinatorEventProcessor eventProcessor, + ZooKeeperClient zkClient, + EventManager eventManager, + Clock clock, + Configuration conf) { this( eventProcessor, zkClient, eventManager, clock, + conf, Executors.newScheduledThreadPool( 1, new ExecutorThreadFactory("rebalance-timeout"))); } @@ -142,12 +151,27 @@ public RebalanceManager( EventManager eventManager, Clock clock, ScheduledExecutorService timeoutChecker) { + this(eventProcessor, zkClient, eventManager, clock, new Configuration(), timeoutChecker); + } + + @VisibleForTesting + RebalanceManager( + CoordinatorEventProcessor eventProcessor, + ZooKeeperClient zkClient, + EventManager eventManager, + Clock clock, + Configuration conf, + ScheduledExecutorService timeoutChecker) { this.eventProcessor = eventProcessor; this.zkClient = zkClient; this.eventManager = eventManager; this.clock = clock == null ? SystemClock.getInstance() : clock; this.timeoutChecker = timeoutChecker; this.goalOptimizer = new GoalOptimizer(); + validate(conf); + this.maxInflightRebalanceTasks = + conf.get(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS); + this.queuedMaxInflightRebalanceTasks = maxInflightRebalanceTasks; } public void startup() { @@ -172,15 +196,51 @@ public void start() { return currentRebalanceId; } + @Override + public void validate(Configuration newConfig) throws ConfigException { + int newMaxInflightRebalanceTasks = + newConfig.get(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS); + if (newMaxInflightRebalanceTasks < 0) { + throw new ConfigException( + String.format( + "Invalid %s: must be non-negative, but was %s", + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(), + newMaxInflightRebalanceTasks)); + } + } + + @Override + public synchronized void reconfigure(Configuration newConfig) { + int newMaxInflightRebalanceTasks = + newConfig.get(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS); + if (newMaxInflightRebalanceTasks == queuedMaxInflightRebalanceTasks) { + LOG.debug( + "{} unchanged: {}", + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(), + newMaxInflightRebalanceTasks); + return; + } + + int oldQueuedMaxInflightRebalanceTasks = queuedMaxInflightRebalanceTasks; + queuedMaxInflightRebalanceTasks = newMaxInflightRebalanceTasks; + LOG.info( + "{} change queued: {} -> {}", + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(), + oldQueuedMaxInflightRebalanceTasks, + newMaxInflightRebalanceTasks); + + if (!isClosed) { + eventManager.put( + new RebalanceMaxInflightTasksChangedEvent(newMaxInflightRebalanceTasks)); + } + } + private void initialize() { try { zkClient.getRebalanceTask() .ifPresent( - rebalancePlan -> - registerRebalance( - rebalancePlan.getRebalanceId(), - rebalancePlan.getExecutePlan(), - rebalancePlan.getRebalanceStatus())); + rebalanceTask -> + eventManager.put(new RecoverRebalanceEvent(rebalanceTask))); } catch (Exception e) { LOG.error( "Failed to get rebalance plan from zookeeper, it will be treated as no" @@ -189,7 +249,7 @@ private void initialize() { } } - public void registerRebalance( + public synchronized void registerRebalance( String rebalanceId, Map rebalancePlan, RebalanceStatus newStatus) { @@ -197,11 +257,9 @@ public void registerRebalance( registerTime = System.currentTimeMillis(); // first clear all exists tasks. inProgressRebalanceTasks.clear(); - inProgressRebalanceTasksQueue.clear(); + pendingRebalanceTasksQueue.clear(); + inflightRebalanceTaskStartMs.clear(); finishedRebalanceTasks.clear(); - // Clear gate (bucket) first, then data (startMs). - inflightTaskBucket = null; - inflightTaskStartMs = -1; currentRebalanceId = rebalanceId; if (rebalancePlan.isEmpty()) { @@ -215,51 +273,53 @@ public void registerRebalance( finishedRebalanceTasks.put( tableBucket, RebalanceResultForBucket.of(planForBucket, newStatus)); } else { - inProgressRebalanceTasksQueue.add(tableBucket); + pendingRebalanceTasksQueue.add(tableBucket); inProgressRebalanceTasks.put( tableBucket, RebalanceResultForBucket.of(planForBucket, NOT_STARTED)); } })); - if (!inProgressRebalanceTasksQueue.isEmpty()) { - // Trigger one rebalance task to execute. + if (!pendingRebalanceTasksQueue.isEmpty()) { + // Trigger rebalance tasks to execute. rebalanceStatus = REBALANCING; - processNewRebalanceTask(); + processNewRebalanceTasks(); } else { rebalanceStatus = newStatus; } } - public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus statusForBucket) { + public synchronized void finishRebalanceTask( + TableBucket tableBucket, RebalanceStatus statusForBucket) { checkNotClosed(); - if (inProgressRebalanceTasksQueue.contains(tableBucket)) { - inProgressRebalanceTasksQueue.remove(tableBucket); - RebalanceResultForBucket resultForBucket = inProgressRebalanceTasks.remove(tableBucket); - checkNotNull(resultForBucket, "RebalanceResultForBucket is null."); - finishedRebalanceTasks.put( - tableBucket, - RebalanceResultForBucket.of(resultForBucket.plan(), statusForBucket)); - // Clear gate (bucket) first, then data (startMs). - inflightTaskBucket = null; - inflightTaskStartMs = -1; - LOG.info( - "Rebalance task {} in progress: {} tasks pending, {} completed.", - currentRebalanceId, - inProgressRebalanceTasksQueue.size(), - finishedRebalanceTasks.size()); - - if (inProgressRebalanceTasksQueue.isEmpty()) { - // All rebalance tasks are completed. - completeRebalance(); - } else { - // Trigger one rebalance task to execute. - processNewRebalanceTask(); - } + RebalanceResultForBucket resultForBucket = inProgressRebalanceTasks.get(tableBucket); + if (resultForBucket == null || resultForBucket.status() != REBALANCING) { + return; + } + + inProgressRebalanceTasks.remove(tableBucket); + pendingRebalanceTasksQueue.remove(tableBucket); + inflightRebalanceTaskStartMs.remove(tableBucket); + finishedRebalanceTasks.put( + tableBucket, RebalanceResultForBucket.of(resultForBucket.plan(), statusForBucket)); + LOG.info( + "Rebalance task {} in progress: {} tasks pending, {} tasks in-flight, {} completed.", + currentRebalanceId, + pendingRebalanceTasksQueue.size(), + inflightRebalanceTaskStartMs.size(), + finishedRebalanceTasks.size()); + + if (inProgressRebalanceTasks.isEmpty()) { + // All rebalance tasks are completed. + completeRebalance(); + } else { + // Trigger new rebalance tasks to execute if there is available capacity. + processNewRebalanceTasks(); } } - public @Nullable RebalanceProgress listRebalanceProgress(@Nullable String rebalanceId) { + public synchronized @Nullable RebalanceProgress listRebalanceProgress( + @Nullable String rebalanceId) { checkNotClosed(); if (rebalanceId != null && currentRebalanceId != null @@ -285,7 +345,7 @@ public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus statusF currentRebalanceId, rebalanceStatus, 0.0, progressForBucketMap); } - public void cancelRebalance(@Nullable String rebalanceId) { + public synchronized void cancelRebalance(@Nullable String rebalanceId) { checkNotClosed(); if (rebalanceId != null @@ -321,20 +381,18 @@ public void cancelRebalance(@Nullable String rebalanceId) { } rebalanceStatus = CANCELED; - inProgressRebalanceTasksQueue.clear(); + pendingRebalanceTasksQueue.clear(); inProgressRebalanceTasks.clear(); - // Clear gate (bucket) first, then data (startMs). - inflightTaskBucket = null; - inflightTaskStartMs = -1; + inflightRebalanceTaskStartMs.clear(); // Here, it will not clear finishedRebalanceTasks, because it will be used by // listRebalanceProgress. It will be cleared when next register. LOG.info("Cancel rebalance task success."); } - public boolean hasInProgressRebalance() { + public synchronized boolean hasInProgressRebalance() { checkNotClosed(); - return !inProgressRebalanceTasks.isEmpty() || !inProgressRebalanceTasksQueue.isEmpty(); + return !inProgressRebalanceTasks.isEmpty() || !pendingRebalanceTasksQueue.isEmpty(); } public RebalanceTask generateRebalanceTask(List goalsByPriority) { @@ -366,28 +424,62 @@ public RebalanceTask generateRebalanceTask(List goalsByPriority) { return buildRebalanceTask(rebalanceId, rebalancePlanForBuckets); } - public @Nullable RebalancePlanForBucket getRebalancePlanForBucket(TableBucket tableBucket) { + public synchronized @Nullable RebalancePlanForBucket getRebalancePlanForBucket( + TableBucket tableBucket) { checkNotClosed(); RebalanceResultForBucket resultForBucket = inProgressRebalanceTasks.get(tableBucket); - if (resultForBucket != null) { + if (resultForBucket != null && resultForBucket.status() == REBALANCING) { return resultForBucket.plan(); } return null; } - private void processNewRebalanceTask() { - TableBucket tableBucket = inProgressRebalanceTasksQueue.peek(); - if (tableBucket != null && inProgressRebalanceTasks.containsKey(tableBucket)) { - // Write data (startMs) first, then publish gate (bucket). - inflightTaskStartMs = clock.milliseconds(); - inflightTaskBucket = tableBucket; + private void processNewRebalanceTasks() { + while (inflightRebalanceTaskStartMs.size() < maxInflightRebalanceTasks) { + TableBucket tableBucket = pendingRebalanceTasksQueue.poll(); + if (tableBucket == null) { + return; + } + RebalanceResultForBucket resultForBucket = inProgressRebalanceTasks.get(tableBucket); + if (resultForBucket == null || resultForBucket.status() != NOT_STARTED) { + continue; + } + RebalanceResultForBucket rebalanceResultForBucket = RebalanceResultForBucket.of(resultForBucket.plan(), REBALANCING); + inProgressRebalanceTasks.put(tableBucket, rebalanceResultForBucket); + inflightRebalanceTaskStartMs.put(tableBucket, clock.milliseconds()); eventProcessor.tryToExecuteRebalanceTask(rebalanceResultForBucket.plan()); } } + public synchronized void updateMaxInflightRebalanceTasks(int newMaxInflightRebalanceTasks) { + checkArgument( + newMaxInflightRebalanceTasks >= 0, + "%s must be non-negative.", + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key()); + int oldMaxInflightRebalanceTasks = maxInflightRebalanceTasks; + if (newMaxInflightRebalanceTasks == oldMaxInflightRebalanceTasks) { + LOG.debug( + "{} unchanged: {}", + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(), + newMaxInflightRebalanceTasks); + return; + } + + maxInflightRebalanceTasks = newMaxInflightRebalanceTasks; + LOG.info( + "{} reconfigured: {} -> {}", + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(), + oldMaxInflightRebalanceTasks, + newMaxInflightRebalanceTasks); + + if (!isClosed) { + processNewRebalanceTasks(); + } + } + private void completeRebalance() { checkNotClosed(); try { @@ -409,7 +501,8 @@ private void completeRebalance() { rebalanceStatus = COMPLETED; inProgressRebalanceTasks.clear(); - inProgressRebalanceTasksQueue.clear(); + pendingRebalanceTasksQueue.clear(); + inflightRebalanceTaskStartMs.clear(); // Here, it will not clear finishedRebalanceTasks, because it will be used by // listRebalanceProgress. It will be cleared when next register. @@ -489,26 +582,19 @@ private void checkTimeoutSafely() { @VisibleForTesting void checkTimeout() { - // Read gate (bucket) first, then data (startMs). - // If bucket is non-null, happens-before guarantees startMs is at least as - // fresh as the value written before bucket was published. - TableBucket bucket = inflightTaskBucket; - long startMs = inflightTaskStartMs; - if (bucket == null || startMs < 0) { - return; - } - long elapsed = clock.milliseconds() - startMs; - if (elapsed > REBALANCE_TASK_TIMEOUT_MS) { - LOG.warn( - "In-flight rebalance task for {} timed out after {}ms. " - + "Treating it as timed out and advancing to the next task.", - bucket, - elapsed); - // Clear gate (bucket) first, then data (startMs), matching the - // publication idiom so the next checkTimeout sees bucket==null. - inflightTaskBucket = null; - inflightTaskStartMs = -1; - eventManager.put(new RebalanceTaskTimeoutEvent(bucket)); + for (Map.Entry entry : + new HashMap<>(inflightRebalanceTaskStartMs).entrySet()) { + TableBucket bucket = entry.getKey(); + long startMs = entry.getValue(); + long elapsed = clock.milliseconds() - startMs; + if (elapsed > REBALANCE_TASK_TIMEOUT_MS) { + LOG.warn( + "In-flight rebalance task for {} timed out after {}ms. " + + "Treating it as timed out and advancing to the next task.", + bucket, + elapsed); + eventManager.put(new RebalanceTaskTimeoutEvent(bucket)); + } } } @@ -531,4 +617,9 @@ public ClusterModel buildClusterModel() { RebalanceStatus getRebalanceStatus() { return rebalanceStatus; } + + @VisibleForTesting + int getMaxInflightRebalanceTasks() { + return maxInflightRebalanceTasks; + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java index c41afa0f6d..826f336fd8 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java @@ -465,6 +465,164 @@ public void reconfigure(Configuration newConfig) { assertThat(reconfiguredInterval.get()).isEqualTo(Duration.ofMinutes(5)); } + @Test + void testDynamicRebalanceMaxInflightTasksChange() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 1); + + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + + AtomicInteger reconfiguredValue = new AtomicInteger(); + dynamicConfigManager.register( + new ServerReconfigurable() { + @Override + public void validate(Configuration newConfig) throws ConfigException { + int newValue = + newConfig.get( + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS); + if (newValue < 0) { + throw new ConfigException( + String.format( + "Invalid %s: must be non-negative, but was %s", + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS + .key(), + newValue)); + } + } + + @Override + public void reconfigure(Configuration newConfig) { + reconfiguredValue.set( + newConfig.get( + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS)); + } + }); + dynamicConfigManager.startup(); + assertThat(reconfiguredValue.get()).isEqualTo(1); + + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(), + "5", + AlterConfigOpType.SET))); + + Map zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig.get(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key())) + .isEqualTo("5"); + assertThat(reconfiguredValue.get()).isEqualTo(5); + + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(), + "0", + AlterConfigOpType.SET))); + + zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig.get(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key())) + .isEqualTo("0"); + assertThat(reconfiguredValue.get()).isZero(); + + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(), + null, + AlterConfigOpType.DELETE))); + + zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig) + .doesNotContainKey(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key()); + assertThat(reconfiguredValue.get()).isEqualTo(1); + } + + @Test + void testPreventInvalidRebalanceMaxInflightTasks() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 1); + + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + + AtomicInteger reconfiguredValue = new AtomicInteger(); + dynamicConfigManager.register( + new ServerReconfigurable() { + @Override + public void validate(Configuration newConfig) throws ConfigException { + int newValue = + newConfig.get( + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS); + if (newValue < 0) { + throw new ConfigException( + String.format( + "Invalid %s: must be non-negative, but was %s", + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS + .key(), + newValue)); + } + } + + @Override + public void reconfigure(Configuration newConfig) { + reconfiguredValue.set( + newConfig.get( + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS)); + } + }); + dynamicConfigManager.startup(); + + assertThatThrownBy( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions + .COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS + .key(), + "-1", + AlterConfigOpType.SET)))) + .isInstanceOf(ConfigException.class) + .hasMessageContaining("must be non-negative"); + + assertThat(reconfiguredValue.get()).isEqualTo(1); + assertThat(zookeeperClient.fetchEntityConfig()) + .doesNotContainKey(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key()); + } + + @Test + void testLateRegisterReceivesCurrentDynamicRebalanceMaxInflightTasks() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 1); + + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + dynamicConfigManager.startup(); + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS.key(), + "4", + AlterConfigOpType.SET))); + + AtomicInteger reconfiguredValue = new AtomicInteger(); + dynamicConfigManager.register( + new ServerReconfigurable() { + @Override + public void validate(Configuration newConfig) throws ConfigException {} + + @Override + public void reconfigure(Configuration newConfig) { + reconfiguredValue.set( + newConfig.get( + ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS)); + } + }); + + assertThat(reconfiguredValue.get()).isEqualTo(4); + } + @Test void testDynamicReconfigurationOfRemoteDataDirs() throws Exception { Configuration configuration = new Configuration(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 001a90f74e..c4ca705ebf 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -1994,7 +1994,14 @@ private static void drainPendingNotifyTriggers( private int countInProgressRebalanceTasks(TableBucket... buckets) { int count = 0; for (TableBucket tb : buckets) { - if (eventProcessor.getRebalanceManager().getRebalancePlanForBucket(tb) != null) { + RebalanceStatus status = + eventProcessor + .getRebalanceManager() + .listRebalanceProgress(null) + .progressForBucketMap() + .get(tb) + .status(); + if (!RebalanceStatus.FINAL_STATUSES.contains(status)) { count++; } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java index 0360b6a4e8..55eede1563 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java @@ -32,7 +32,9 @@ import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager; import org.apache.fluss.server.coordinator.event.CoordinatorEvent; import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.coordinator.event.RebalanceMaxInflightTasksChangedEvent; import org.apache.fluss.server.coordinator.event.RebalanceTaskTimeoutEvent; +import org.apache.fluss.server.coordinator.event.RecoverRebalanceEvent; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; import org.apache.fluss.server.metadata.CoordinatorMetadataCache; @@ -57,6 +59,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; @@ -162,13 +165,46 @@ void testRebalanceWithoutTask() throws Exception { .hasValue(new RebalanceTask(rebalanceId, COMPLETED, new HashMap<>())); } + @Test + void testStartupQueuesRecoverRebalanceEvent() throws Exception { + ManualClock clock = new ManualClock(0L); + RecordingEventManager eventManager = new RecordingEventManager(); + NoOpScheduledExecutor executor = new NoOpScheduledExecutor(); + RecordingCoordinatorEventProcessor eventProcessor = + buildRecordingCoordinatorEventProcessor(new Configuration()); + RebalanceManager manager = + new RebalanceManager( + eventProcessor, zookeeperClient, eventManager, clock, executor); + + Map plan = createRebalancePlan(2); + RebalanceTask rebalanceTask = new RebalanceTask("recover-test", NOT_STARTED, plan); + zookeeperClient.registerRebalanceTask(rebalanceTask); + + manager.startup(); + + assertThat(eventProcessor.executedPlans).isEmpty(); + assertThat(eventManager.events).hasSize(1); + assertThat(eventManager.events.get(0)).isInstanceOf(RecoverRebalanceEvent.class); + + RecoverRebalanceEvent recoverEvent = (RecoverRebalanceEvent) eventManager.events.get(0); + assertThat(recoverEvent.getRebalanceTask()).isEqualTo(rebalanceTask); + + manager.registerRebalance( + rebalanceTask.getRebalanceId(), + rebalanceTask.getExecutePlan(), + rebalanceTask.getRebalanceStatus()); + assertThat(eventProcessor.executedPlans).hasSize(1); + + manager.close(); + } + @Test void testTimeoutEnqueuesEvent() throws Exception { ManualClock clock = new ManualClock(0L); RecordingEventManager eventManager = new RecordingEventManager(); NoOpScheduledExecutor executor = new NoOpScheduledExecutor(); - CoordinatorEventProcessor eventProcessor = - buildCoordinatorEventProcessor(new Configuration()); + RecordingCoordinatorEventProcessor eventProcessor = + buildRecordingCoordinatorEventProcessor(new Configuration()); RebalanceManager manager = new RebalanceManager( @@ -177,7 +213,7 @@ void testTimeoutEnqueuesEvent() throws Exception { TableBucket tb1 = new TableBucket(1L, 0); TableBucket tb2 = new TableBucket(1L, 1); - Map plan = new HashMap<>(); + Map plan = new LinkedHashMap<>(); plan.put( tb1, new RebalancePlanForBucket( @@ -189,6 +225,7 @@ void testTimeoutEnqueuesEvent() throws Exception { zookeeperClient.registerRebalanceTask(new RebalanceTask("timeout-test", NOT_STARTED, plan)); manager.registerRebalance("timeout-test", plan, NOT_STARTED); + assertThat(eventProcessor.executedPlans).hasSize(1); // Not yet timed out. clock.advanceTime(Duration.ofMillis(100_000)); @@ -205,8 +242,11 @@ void testTimeoutEnqueuesEvent() throws Exception { (RebalanceTaskTimeoutEvent) eventManager.events.get(0); assertThat(timeoutEvent.getTableBucket()).isEqualTo(tb1); - // A second checkTimeout() should NOT enqueue another event because the - // inflight state was cleared after the first timeout. + manager.finishRebalanceTask(tb1, TIMEOUT); + assertThat(eventProcessor.executedPlans).hasSize(2); + + // A second checkTimeout() should NOT enqueue another event because the timeout event + // has been handled on the coordinator event thread. clock.advanceTime(Duration.ofMillis(30_000)); manager.checkTimeout(); assertThat(eventManager.events).hasSize(1); @@ -219,8 +259,8 @@ void testTimeoutAfterCompletionIsNoOp() throws Exception { ManualClock clock = new ManualClock(0L); RecordingEventManager eventManager = new RecordingEventManager(); NoOpScheduledExecutor executor = new NoOpScheduledExecutor(); - CoordinatorEventProcessor eventProcessor = - buildCoordinatorEventProcessor(new Configuration()); + RecordingCoordinatorEventProcessor eventProcessor = + buildRecordingCoordinatorEventProcessor(new Configuration()); RebalanceManager manager = new RebalanceManager( @@ -228,7 +268,7 @@ void testTimeoutAfterCompletionIsNoOp() throws Exception { manager.startup(); TableBucket tb1 = new TableBucket(1L, 0); - Map plan = new HashMap<>(); + Map plan = new LinkedHashMap<>(); plan.put( tb1, new RebalancePlanForBucket( @@ -256,8 +296,8 @@ void testTimeoutTreatsTaskAsCompleted() throws Exception { ManualClock clock = new ManualClock(0L); RecordingEventManager eventManager = new RecordingEventManager(); NoOpScheduledExecutor executor = new NoOpScheduledExecutor(); - CoordinatorEventProcessor eventProcessor = - buildCoordinatorEventProcessor(new Configuration()); + RecordingCoordinatorEventProcessor eventProcessor = + buildRecordingCoordinatorEventProcessor(new Configuration()); RebalanceManager manager = new RebalanceManager( @@ -266,7 +306,7 @@ void testTimeoutTreatsTaskAsCompleted() throws Exception { TableBucket tb1 = new TableBucket(1L, 0); TableBucket tb2 = new TableBucket(1L, 1); - Map plan = new HashMap<>(); + Map plan = new LinkedHashMap<>(); plan.put( tb1, new RebalancePlanForBucket( @@ -299,6 +339,205 @@ void testTimeoutTreatsTaskAsCompleted() throws Exception { manager.close(); } + @Test + void testRegisterRebalanceRespectsMaxInflightTasks() throws Exception { + ManualClock clock = new ManualClock(0L); + RecordingEventManager eventManager = new RecordingEventManager(); + NoOpScheduledExecutor executor = new NoOpScheduledExecutor(); + Configuration conf = new Configuration(); + conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 2); + RecordingCoordinatorEventProcessor eventProcessor = + buildRecordingCoordinatorEventProcessor(conf); + RebalanceManager manager = + new RebalanceManager( + eventProcessor, zookeeperClient, eventManager, clock, conf, executor); + + Map plan = createRebalancePlan(5); + List buckets = new ArrayList<>(plan.keySet()); + zookeeperClient.registerRebalanceTask( + new RebalanceTask("inflight-test", NOT_STARTED, plan)); + + manager.registerRebalance("inflight-test", plan, NOT_STARTED); + + assertThat(eventProcessor.executedPlans).hasSize(2); + assertThat(countStatus(manager, RebalanceStatus.REBALANCING)).isEqualTo(2); + assertThat(countStatus(manager, RebalanceStatus.NOT_STARTED)).isEqualTo(3); + + manager.finishRebalanceTask(buckets.get(0), COMPLETED); + + assertThat(eventProcessor.executedPlans).hasSize(3); + assertThat(eventProcessor.executedPlans.get(2).getTableBucket()).isEqualTo(buckets.get(2)); + assertThat(countStatus(manager, RebalanceStatus.REBALANCING)).isEqualTo(2); + assertThat(countStatus(manager, RebalanceStatus.NOT_STARTED)).isEqualTo(2); + + manager.close(); + } + + @Test + void testIncreaseMaxInflightRebalanceTasksStartsMoreTasks() throws Exception { + ManualClock clock = new ManualClock(0L); + RecordingEventManager eventManager = new RecordingEventManager(); + NoOpScheduledExecutor executor = new NoOpScheduledExecutor(); + Configuration conf = new Configuration(); + conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 1); + RecordingCoordinatorEventProcessor eventProcessor = + buildRecordingCoordinatorEventProcessor(conf); + RebalanceManager manager = + new RebalanceManager( + eventProcessor, zookeeperClient, eventManager, clock, conf, executor); + + Map plan = createRebalancePlan(4); + zookeeperClient.registerRebalanceTask( + new RebalanceTask("increase-inflight-test", NOT_STARTED, plan)); + manager.registerRebalance("increase-inflight-test", plan, NOT_STARTED); + assertThat(eventProcessor.executedPlans).hasSize(1); + + Configuration newConfig = new Configuration(conf); + newConfig.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 3); + manager.reconfigure(newConfig); + + assertThat(manager.getMaxInflightRebalanceTasks()).isEqualTo(1); + assertThat(eventProcessor.executedPlans).hasSize(1); + applyLatestMaxInflightTasksChangedEvent(eventManager, manager, 3); + + assertThat(manager.getMaxInflightRebalanceTasks()).isEqualTo(3); + assertThat(eventProcessor.executedPlans).hasSize(3); + assertThat(countStatus(manager, RebalanceStatus.REBALANCING)).isEqualTo(3); + assertThat(countStatus(manager, RebalanceStatus.NOT_STARTED)).isEqualTo(1); + + manager.close(); + } + + @Test + void testZeroMaxInflightRebalanceTasksPausesAndResumesScheduling() throws Exception { + ManualClock clock = new ManualClock(0L); + RecordingEventManager eventManager = new RecordingEventManager(); + NoOpScheduledExecutor executor = new NoOpScheduledExecutor(); + Configuration conf = new Configuration(); + conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 0); + RecordingCoordinatorEventProcessor eventProcessor = + buildRecordingCoordinatorEventProcessor(conf); + RebalanceManager manager = + new RebalanceManager( + eventProcessor, zookeeperClient, eventManager, clock, conf, executor); + + Map plan = createRebalancePlan(4); + zookeeperClient.registerRebalanceTask( + new RebalanceTask("paused-inflight-test", NOT_STARTED, plan)); + manager.registerRebalance("paused-inflight-test", plan, NOT_STARTED); + + assertThat(manager.getMaxInflightRebalanceTasks()).isZero(); + assertThat(eventProcessor.executedPlans).isEmpty(); + assertThat(countStatus(manager, RebalanceStatus.NOT_STARTED)).isEqualTo(4); + assertThat(manager.hasInProgressRebalance()).isTrue(); + + Configuration newConfig = new Configuration(conf); + newConfig.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 2); + manager.reconfigure(newConfig); + + assertThat(manager.getMaxInflightRebalanceTasks()).isZero(); + assertThat(eventProcessor.executedPlans).isEmpty(); + applyLatestMaxInflightTasksChangedEvent(eventManager, manager, 2); + + assertThat(manager.getMaxInflightRebalanceTasks()).isEqualTo(2); + assertThat(eventProcessor.executedPlans).hasSize(2); + assertThat(countStatus(manager, RebalanceStatus.REBALANCING)).isEqualTo(2); + assertThat(countStatus(manager, RebalanceStatus.NOT_STARTED)).isEqualTo(2); + + manager.close(); + } + + @Test + void testDecreaseMaxInflightRebalanceTasksToZeroDoesNotCancelRunningTasks() throws Exception { + ManualClock clock = new ManualClock(0L); + RecordingEventManager eventManager = new RecordingEventManager(); + NoOpScheduledExecutor executor = new NoOpScheduledExecutor(); + Configuration conf = new Configuration(); + conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 3); + RecordingCoordinatorEventProcessor eventProcessor = + buildRecordingCoordinatorEventProcessor(conf); + RebalanceManager manager = + new RebalanceManager( + eventProcessor, zookeeperClient, eventManager, clock, conf, executor); + + Map plan = createRebalancePlan(5); + List buckets = new ArrayList<>(plan.keySet()); + zookeeperClient.registerRebalanceTask( + new RebalanceTask("decrease-inflight-test", NOT_STARTED, plan)); + manager.registerRebalance("decrease-inflight-test", plan, NOT_STARTED); + assertThat(eventProcessor.executedPlans).hasSize(3); + + Configuration newConfig = new Configuration(conf); + newConfig.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 0); + manager.reconfigure(newConfig); + + assertThat(manager.getMaxInflightRebalanceTasks()).isEqualTo(3); + applyLatestMaxInflightTasksChangedEvent(eventManager, manager, 0); + + assertThat(manager.getMaxInflightRebalanceTasks()).isZero(); + assertThat(eventProcessor.executedPlans).hasSize(3); + manager.finishRebalanceTask(buckets.get(0), COMPLETED); + manager.finishRebalanceTask(buckets.get(1), COMPLETED); + assertThat(eventProcessor.executedPlans).hasSize(3); + + manager.finishRebalanceTask(buckets.get(2), COMPLETED); + + assertThat(eventProcessor.executedPlans).hasSize(3); + assertThat(countStatus(manager, RebalanceStatus.NOT_STARTED)).isEqualTo(2); + assertThat(countStatus(manager, RebalanceStatus.REBALANCING)).isZero(); + + newConfig.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 1); + manager.reconfigure(newConfig); + + assertThat(eventProcessor.executedPlans).hasSize(3); + applyLatestMaxInflightTasksChangedEvent(eventManager, manager, 1); + + assertThat(eventProcessor.executedPlans).hasSize(4); + assertThat(eventProcessor.executedPlans.get(3).getTableBucket()).isEqualTo(buckets.get(3)); + assertThat(countStatus(manager, RebalanceStatus.REBALANCING)).isEqualTo(1); + + manager.close(); + } + + @Test + void testTimeoutEnqueuesEventsForAllInflightTasks() throws Exception { + ManualClock clock = new ManualClock(0L); + RecordingEventManager eventManager = new RecordingEventManager(); + NoOpScheduledExecutor executor = new NoOpScheduledExecutor(); + Configuration conf = new Configuration(); + conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_INFLIGHT_TASKS, 2); + RecordingCoordinatorEventProcessor eventProcessor = + buildRecordingCoordinatorEventProcessor(conf); + RebalanceManager manager = + new RebalanceManager( + eventProcessor, zookeeperClient, eventManager, clock, conf, executor); + + Map plan = createRebalancePlan(3); + List buckets = new ArrayList<>(plan.keySet()); + zookeeperClient.registerRebalanceTask( + new RebalanceTask("timeout-all-inflight-test", NOT_STARTED, plan)); + manager.registerRebalance("timeout-all-inflight-test", plan, NOT_STARTED); + assertThat(eventProcessor.executedPlans).hasSize(2); + + clock.advanceTime(Duration.ofMillis(130_000)); + manager.checkTimeout(); + + assertThat(eventManager.events).hasSize(2); + assertThat(((RebalanceTaskTimeoutEvent) eventManager.events.get(0)).getTableBucket()) + .isEqualTo(buckets.get(0)); + assertThat(((RebalanceTaskTimeoutEvent) eventManager.events.get(1)).getTableBucket()) + .isEqualTo(buckets.get(1)); + + manager.finishRebalanceTask(buckets.get(0), TIMEOUT); + manager.finishRebalanceTask(buckets.get(1), TIMEOUT); + assertThat(eventProcessor.executedPlans).hasSize(3); + + manager.checkTimeout(); + assertThat(eventManager.events).hasSize(2); + + manager.close(); + } + private CoordinatorEventProcessor buildCoordinatorEventProcessor(Configuration conf) { return new CoordinatorEventProcessor( zookeeperClient, @@ -315,6 +554,89 @@ private CoordinatorEventProcessor buildCoordinatorEventProcessor(Configuration c SystemClock.getInstance()); } + private RecordingCoordinatorEventProcessor buildRecordingCoordinatorEventProcessor( + Configuration conf) { + return new RecordingCoordinatorEventProcessor( + zookeeperClient, + serverMetadataCache, + testCoordinatorChannelManager, + new CoordinatorContext(zkEpoch), + autoPartitionManager, + lakeTableTieringManager, + conf, + metadataManager, + kvSnapshotLeaseManager); + } + + private static Map createRebalancePlan(int bucketCount) { + Map plan = new LinkedHashMap<>(); + for (int bucketId = 0; bucketId < bucketCount; bucketId++) { + TableBucket tableBucket = new TableBucket(1L, bucketId); + plan.put( + tableBucket, + new RebalancePlanForBucket( + tableBucket, 0, 0, Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2))); + } + return plan; + } + + private static int countStatus(RebalanceManager manager, RebalanceStatus status) { + int count = 0; + for (RebalanceResultForBucket result : + manager.listRebalanceProgress(null).progressForBucketMap().values()) { + if (result.status() == status) { + count++; + } + } + return count; + } + + private static void applyLatestMaxInflightTasksChangedEvent( + RecordingEventManager eventManager, RebalanceManager manager, int expectedMaxInflight) { + CoordinatorEvent event = eventManager.events.get(eventManager.events.size() - 1); + assertThat(event).isInstanceOf(RebalanceMaxInflightTasksChangedEvent.class); + RebalanceMaxInflightTasksChangedEvent changedEvent = + (RebalanceMaxInflightTasksChangedEvent) event; + assertThat(changedEvent.getMaxInflightTasks()).isEqualTo(expectedMaxInflight); + manager.updateMaxInflightRebalanceTasks(changedEvent.getMaxInflightTasks()); + } + + private static final class RecordingCoordinatorEventProcessor + extends CoordinatorEventProcessor { + private final List executedPlans = new ArrayList<>(); + + private RecordingCoordinatorEventProcessor( + ZooKeeperClient zooKeeperClient, + CoordinatorMetadataCache serverMetadataCache, + TestCoordinatorChannelManager testCoordinatorChannelManager, + CoordinatorContext coordinatorContext, + AutoPartitionManager autoPartitionManager, + LakeTableTieringManager lakeTableTieringManager, + Configuration conf, + MetadataManager metadataManager, + KvSnapshotLeaseManager kvSnapshotLeaseManager) { + super( + zooKeeperClient, + serverMetadataCache, + testCoordinatorChannelManager, + coordinatorContext, + autoPartitionManager, + lakeTableTieringManager, + TestingMetricGroups.COORDINATOR_METRICS, + conf, + Executors.newFixedThreadPool( + 1, new ExecutorThreadFactory("recording-coordinator-io")), + metadataManager, + kvSnapshotLeaseManager, + SystemClock.getInstance()); + } + + @Override + public void tryToExecuteRebalanceTask(RebalancePlanForBucket planForBucket) { + executedPlans.add(planForBucket); + } + } + /** Records events put into the coordinator event queue. */ private static final class RecordingEventManager implements EventManager { final List events = new ArrayList<>(); diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 0a4223074d..96e61578ca 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -21,8 +21,8 @@ auto-partition.check.interval: 5min ``` Server configuration refers to a set of configurations used to specify the running parameters of a server. -These settings can only be configured at the time of cluster startup and do not support dynamic modification -during the Fluss cluster working. +Most settings are parsed when the Fluss processes start and require restarting the relevant processes to take effect. +Some server configurations can be updated dynamically while the cluster is running. See [Updating Cluster Configs](operations/updating-configs.md#updating-cluster-configs) for the supported dynamic options. ## Common @@ -56,13 +56,14 @@ during the Fluss cluster working. ## CoordinatorServer -| Option | Type | Default | Description | -|----------------------------------------------------|------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| coordinator.io-pool.size | Integer | 10 | **Deprecated**: This option is deprecated. Please use `server.io-pool.size` instead. The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 10. | -| coordinator.producer-offsets.ttl | Duration | 24h | The TTL (time-to-live) for producer offsets. Producer offsets older than this TTL will be automatically cleaned up by the coordinator server. Producer offsets are used for undo recovery when a Flink job fails over before completing its first checkpoint. The default value is 24 hours. | -| coordinator.producer-offsets.cleanup-interval | Duration | 1h | The interval for cleaning up expired producer offsets and orphan files in remote storage. The cleanup task runs periodically to remove expired offsets and any orphan files that may have been left behind due to incomplete operations. The default value is 1 hour. | -| coordinator.lifecycle-throttler.inflight-timeout | Duration | 3min | The timeout for an in-flight drop event in the coordinator's TableLifecycleThrottler. If a drop event has been admitted but the corresponding completion callback has not arrived within this timeout, the throttler abandons tracking of that drop and continues admitting the next pending drop. | +| Option | Type | Default | Description | +|--------------------------------------------------------|------------|-----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| coordinator.io-pool.size | Integer | 10 | **Deprecated**: This option is deprecated. Please use `server.io-pool.size` instead. The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 10. | +| coordinator.producer-offsets.ttl | Duration | 24h | The TTL (time-to-live) for producer offsets. Producer offsets older than this TTL will be automatically cleaned up by the coordinator server. Producer offsets are used for undo recovery when a Flink job fails over before completing its first checkpoint. The default value is 24 hours. | +| coordinator.producer-offsets.cleanup-interval | Duration | 1h | The interval for cleaning up expired producer offsets and orphan files in remote storage. The cleanup task runs periodically to remove expired offsets and any orphan files that may have been left behind due to incomplete operations. The default value is 1 hour. | +| coordinator.lifecycle-throttler.inflight-timeout | Duration | 3min | The timeout for an in-flight drop event in the coordinator's TableLifecycleThrottler. If a drop event has been admitted but the corresponding completion callback has not arrived within this timeout, the throttler abandons tracking of that drop and continues admitting the next pending drop. | | coordinator.lifecycle-throttler.timeout-check-interval | Duration | 1min | The periodic interval at which the coordinator's TableLifecycleThrottler scans in-flight drops for timeouts. | +| coordinator.rebalance.max-inflight-tasks | Integer | 1 | The maximum number of bucket-level rebalance tasks that can be executed concurrently by the coordinator. A higher value can speed up rebalance, while a lower value reduces the number of simultaneous bucket movements. Setting it to 0 pauses scheduling new rebalance tasks; already in-flight tasks continue until they complete or time out. The value must be non-negative. | ## TabletServer diff --git a/website/docs/maintenance/operations/rebalance.md b/website/docs/maintenance/operations/rebalance.md index 418b7c585f..7be56bbf92 100644 --- a/website/docs/maintenance/operations/rebalance.md +++ b/website/docs/maintenance/operations/rebalance.md @@ -112,7 +112,15 @@ Rebalance statuses: - **CANCELED**: The rebalance has been canceled - **TIMEOUT**: The rebalance task timed out (e.g., ISR could not converge within the timeout period) -### 4. Cancel Rebalance (If Needed) +### 4. Control Rebalance Speed + +The coordinator controls rebalance parallelism with `coordinator.rebalance.max-inflight-tasks`, which limits how many bucket-level rebalance tasks can run at the same time. The default value is `1`, which keeps rebalance conservative. + +Increase the value to speed up rebalance when the cluster has enough network, disk, and TabletServer capacity. Decrease the value to reduce the number of simultaneous bucket movements. Setting it to `0` pauses scheduling new bucket-level rebalance tasks; tasks that are already in flight continue until they complete or time out. Set it back to a positive value to resume scheduling pending tasks. + +This option can be updated dynamically through cluster configuration updates. See [Updating Cluster Configs](updating-configs.md#updating-cluster-configs) for the Java and Flink SQL APIs. + +### 5. Cancel Rebalance (If Needed) Cancel an ongoing rebalance operation if necessary: @@ -129,7 +137,7 @@ admin.cancelRebalance(null).get(); - Already completed bucket migrations will not be rolled back - After cancellation, the rebalance status will change to `CANCELED` -### 5. Remove Server Tags (After Completion) +### 6. Remove Server Tags (After Completion) After rebalance completes and maintenance is done, remove server tags to restore normal operation: @@ -272,11 +280,12 @@ If `RACK_AWARE` is not placed first, replica movements generated by earlier goal 1. **Plan Ahead**: Tag servers appropriately before triggering rebalance to guide the algorithm 2. **Monitor Progress**: Regularly check rebalance status to ensure smooth operation 3. **Off-Peak Hours**: Schedule rebalance operations during off-peak hours to minimize impact -4. **Single Rebalance**: Fluss supports only one active rebalance task at a time in the cluster -5. **Backup First**: For production environments, ensure data is backed up before major topology changes -6. **Goal Priority**: Order rebalance goals by priority - the system attempts to achieve them in order -7. **Server Tags**: Use `TEMPORARY_OFFLINE` for maintenance scenarios to allow buckets to return after maintenance -8. **Rack Awareness First**: In multi-rack deployments, always place `RACK_AWARE` as the first goal +4. **Single Rebalance**: Fluss supports only one active rebalance operation at a time in the cluster +5. **Rebalance Parallelism**: Tune `coordinator.rebalance.max-inflight-tasks` based on cluster capacity. Use `0` to pause scheduling new bucket-level rebalance tasks. +6. **Backup First**: For production environments, ensure data is backed up before major topology changes +7. **Goal Priority**: Order rebalance goals by priority - the system attempts to achieve them in order +8. **Server Tags**: Use `TEMPORARY_OFFLINE` for maintenance scenarios to allow buckets to return after maintenance +9. **Rack Awareness First**: In multi-rack deployments, always place `RACK_AWARE` as the first goal ## Troubleshooting @@ -293,6 +302,7 @@ If rebalance is taking longer than expected: - Check network bandwidth between TabletServers - Verify disk I/O performance on TabletServers - Monitor cluster load and resource utilization +- Consider increasing `coordinator.rebalance.max-inflight-tasks` if the cluster has enough capacity - Consider canceling and retrying with fewer goals ### Rebalance Stuck in REBALANCING Status diff --git a/website/docs/maintenance/operations/updating-configs.md b/website/docs/maintenance/operations/updating-configs.md index 6ab797239f..553125cbc2 100644 --- a/website/docs/maintenance/operations/updating-configs.md +++ b/website/docs/maintenance/operations/updating-configs.md @@ -21,6 +21,7 @@ Currently, the supported dynamically updatable server configurations include: - `datalake.format`: Specify the lakehouse format, e.g., `paimon`, `iceberg`. When enabling lakehouse storage explicitly, use it together with `datalake.enabled = true`. - Options with prefix `datalake.${datalake.format}` - `kv.rocksdb.shared-rate-limiter.bytes-per-sec`: Control RocksDB flush and compaction write rate shared across all RocksDB instances on the TabletServer. The rate limiter is always enabled. Set to a lower value (e.g., 100MB) to limit the rate, or a very high value to effectively disable rate limiting. +- `coordinator.rebalance.max-inflight-tasks`: Control the maximum number of bucket-level rebalance tasks that can run concurrently. Set it to `0` to pause scheduling new rebalance tasks; already in-flight tasks continue until they complete or time out. You can update the configuration of a cluster with [Java client](#using-java-client) or [Flink SQL](#using-flink-sql).