diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 0301d3d47d..9c16e1980f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -221,6 +221,16 @@ public class ConfigOptions { + "TableLifecycleThrottler scans in-flight drops for " + "timeouts."); + public static final ConfigOption COORDINATOR_OFFLINE_LEADER_RETRY_INTERVAL = + key("coordinator.offline-leader.retry-interval") + .durationType() + .defaultValue(Duration.ofMinutes(1)) + .withDescription( + "The interval at which the coordinator retries offline leaders on " + + "live tablet servers. This lets a leader that was rejected " + + "because of temporary tablet-server conditions, such as disk " + + "write protection, become electable again after recovery."); + public static final ConfigOption LOG_TABLE_ALLOW_CREATION = key("allow.create.log.tables") .booleanType() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index dc0fa58a3f..348dd9694e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -225,6 +225,41 @@ public void removeOfflineBucketInServer(int serverId) { replicasOnOffline.remove(serverId); } + /** Removes the offline marker for one table bucket on the given tablet server. */ + public void removeOfflineBucketInServer(TableBucket tableBucket, int serverId) { + Set tableBuckets = replicasOnOffline.get(serverId); + if (tableBuckets == null) { + return; + } + tableBuckets.remove(tableBucket); + if (tableBuckets.isEmpty()) { + replicasOnOffline.remove(serverId); + } + } + + /** + * Returns offline replicas that are on live tablet servers. + * + *

The {@code replicasOnOffline} map is part of {@link #isReplicaOnline(int, TableBucket)}, + * so replicas in it are filtered out from leader election even when their tablet server is + * still live. The coordinator periodically probes these replicas again because some failures, + * such as disk write protection, can recover without a tablet-server restart. + */ + public Set offlineReplicasOnLiveTabletServers() { + Set liveTabletServers = liveTabletServerSet(); + Set offlineReplicas = new HashSet<>(); + for (Map.Entry> entry : replicasOnOffline.entrySet()) { + int serverId = entry.getKey(); + if (!liveTabletServers.contains(serverId)) { + continue; + } + for (TableBucket tableBucket : entry.getValue()) { + offlineReplicas.add(new TableBucketReplica(tableBucket, serverId)); + } + } + return offlineReplicas; + } + // ---- Pending leader activation tracking (for Cluster Health API) ---- public void addPendingLeaderActivation(TableBucket bucket) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 537e2df63f..9b4060c139 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -88,6 +88,7 @@ import org.apache.fluss.server.coordinator.event.RebalanceTaskTimeoutEvent; import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; import org.apache.fluss.server.coordinator.event.ResumeDropEvent; +import org.apache.fluss.server.coordinator.event.RetryOfflineLeaderEvent; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent; import org.apache.fluss.server.coordinator.event.watcher.CoordinatorChangeWatcher; @@ -126,6 +127,7 @@ import org.apache.fluss.utils.AutoPartitionStrategy; import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; import org.apache.fluss.utils.types.Tuple2; import org.slf4j.Logger; @@ -146,6 +148,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket; @@ -188,6 +193,8 @@ public class CoordinatorEventProcessor implements EventProcessor { private final String internalListenerName; private final CoordinatorMetricGroup coordinatorMetricGroup; private final RebalanceManager rebalanceManager; + private final ScheduledExecutorService offlineLeaderRetryScheduler; + private final long offlineLeaderRetryIntervalMs; private final CompletedSnapshotStoreManager completedSnapshotStoreManager; private final LakeTableHelper lakeTableHelper; @@ -262,6 +269,18 @@ public CoordinatorEventProcessor( this.rebalanceManager = new RebalanceManager( this, zooKeeperClient, coordinatorEventManager, SystemClock.getInstance()); + this.offlineLeaderRetryIntervalMs = + conf.get(ConfigOptions.COORDINATOR_OFFLINE_LEADER_RETRY_INTERVAL).toMillis(); + if (offlineLeaderRetryIntervalMs <= 0) { + throw new IllegalArgumentException( + String.format( + "%s must be positive, but was %d ms.", + ConfigOptions.COORDINATOR_OFFLINE_LEADER_RETRY_INTERVAL.key(), + offlineLeaderRetryIntervalMs)); + } + this.offlineLeaderRetryScheduler = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("offline-leader-retry")); this.ioExecutor = ioExecutor; this.lakeTableHelper = new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR)); @@ -322,9 +341,12 @@ public void startup() { // start rebalance manager. rebalanceManager.startup(); rebalanceManager.start(); + + startOfflineLeaderRetryScheduler(); } public void shutdown() { + offlineLeaderRetryScheduler.shutdownNow(); // close the event manager coordinatorEventManager.close(); rebalanceManager.close(); @@ -587,6 +609,25 @@ private void onShutdown() { tabletServerChangeWatcher.stop(); } + private void startOfflineLeaderRetryScheduler() { + offlineLeaderRetryScheduler.scheduleWithFixedDelay( + this::enqueueRetryOfflineLeaderEventSafely, + offlineLeaderRetryIntervalMs, + offlineLeaderRetryIntervalMs, + TimeUnit.MILLISECONDS); + LOG.info( + "Offline leader retry scheduler started: retryIntervalMs={}", + offlineLeaderRetryIntervalMs); + } + + private void enqueueRetryOfflineLeaderEventSafely() { + try { + coordinatorEventManager.put(new RetryOfflineLeaderEvent()); + } catch (Throwable t) { + LOG.warn("Failed to enqueue retry offline leader event.", t); + } + } + @Override public void process(CoordinatorEvent event) { if (event instanceof CreateTableEvent) { @@ -605,6 +646,8 @@ public void process(CoordinatorEvent event) { } else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) { processNotifyLeaderAndIsrResponseReceivedEvent( (NotifyLeaderAndIsrResponseReceivedEvent) event); + } else if (event instanceof RetryOfflineLeaderEvent) { + processRetryOfflineLeader(); } else if (event instanceof DeleteReplicaResponseReceivedEvent) { processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) event); } else if (event instanceof NewCoordinatorEvent) { @@ -702,6 +745,42 @@ public void process(CoordinatorEvent event) { } } + private void processRetryOfflineLeader() { + Set offlineReplicas = + coordinatorContext.offlineReplicasOnLiveTabletServers().stream() + .filter( + replica -> + !coordinatorContext.isToBeDeleted(replica.getTableBucket())) + .filter( + replica -> + coordinatorContext.getReplicaState(replica) + == OfflineReplica) + .collect(Collectors.toSet()); + + if (offlineReplicas.isEmpty()) { + return; + } + + LOG.info( + "Retrying {} offline replicas on live tablet servers before triggering " + + "offline leader election: {}.", + offlineReplicas.size(), + offlineReplicas); + + // replicasOnOffline is checked by CoordinatorContext#isReplicaOnline and therefore also + // by leader election. Remove the marker before probing the live server again; if the + // server still cannot become leader, NotifyLeaderAndIsr will fail and the replica will be + // put back to OfflineReplica. TODO: once standby replicas maintain local KV snapshots, + // distinguish standby promotion from fresh snapshot download when retrying KV leaders. + for (TableBucketReplica offlineReplica : offlineReplicas) { + coordinatorContext.removeOfflineBucketInServer( + offlineReplica.getTableBucket(), offlineReplica.getReplica()); + } + + replicaStateMachine.handleStateChanges(offlineReplicas, OnlineReplica); + tableBucketStateMachine.triggerOnlineBucketStateChange(); + } + private void processCreateTable(CreateTableEvent createTableEvent) { long tableId = createTableEvent.getTableInfo().getTableId(); // skip the table if it already exists diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RetryOfflineLeaderEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RetryOfflineLeaderEvent.java new file mode 100644 index 0000000000..89d8d28586 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RetryOfflineLeaderEvent.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +/** An event for retrying offline leaders on live tablet servers. */ +public class RetryOfflineLeaderEvent implements CoordinatorEvent {} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 280e34124d..29adf341d9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -1824,7 +1824,11 @@ protected boolean validateAndGetIsBecomeLeader(NotifyLeaderAndIsrData data) { if (requestLeaderEpoch >= currentLeaderEpoch) { if (data.getReplicas().contains(serverId)) { int leaderId = data.getLeader(); - return leaderId == serverId; + boolean becomeLeader = leaderId == serverId; + if (becomeLeader) { + ensureWritableForNewKvLeader(replica, requestLeaderEpoch); + } + return becomeLeader; } else { String errorMessage = String.format( @@ -1845,6 +1849,17 @@ protected boolean validateAndGetIsBecomeLeader(NotifyLeaderAndIsrData data) { } } + private void ensureWritableForNewKvLeader(Replica replica, int requestLeaderEpoch) { + if (!replica.isKvTable() || requestLeaderEpoch <= replica.getLeaderEpoch()) { + return; + } + + // TODO: Once standby replicas maintain local KV snapshots, allow promoting a standby + // replica while disk write protection is active because it should not need to download a + // large remote snapshot during make-leader. + localDiskManager.ensureWritable(); + } + /** * If all the following conditions are true, we need to put a delayed write operation into the * delayed write manager and wait for replication to complete. diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java index b69fe5fa3b..557d75a6a4 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableBucketReplica; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; @@ -78,6 +79,41 @@ void testGetLakeTableCount() { assertThat(context.getLakeTableCount()).isEqualTo(2); } + @Test + void testOfflineReplicasOnLiveTabletServersOnlyReturnsLiveServers() { + CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); + TableBucket liveBucket = new TableBucket(1L, 0); + TableBucket deadBucket = new TableBucket(1L, 1); + + context.setLiveTabletServers(Arrays.asList(createTabletServer(0), createTabletServer(1))); + context.addOfflineBucketInServer(liveBucket, 0); + context.addOfflineBucketInServer(deadBucket, 2); + + assertThat(context.offlineReplicasOnLiveTabletServers()) + .containsExactly(new TableBucketReplica(liveBucket, 0)); + } + + @Test + void testRemoveOfflineBucketInServerForBucket() { + CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); + TableBucket tb1 = new TableBucket(1L, 0); + TableBucket tb2 = new TableBucket(1L, 1); + + context.setLiveTabletServers(Collections.singletonList(createTabletServer(0))); + context.addOfflineBucketInServer(tb1, 0); + context.addOfflineBucketInServer(tb2, 0); + + context.removeOfflineBucketInServer(tb1, 0); + assertThat(context.isReplicaOnline(0, tb1)).isTrue(); + assertThat(context.isReplicaOnline(0, tb2)).isFalse(); + assertThat(context.offlineReplicasOnLiveTabletServers()) + .containsExactly(new TableBucketReplica(tb2, 0)); + + context.removeOfflineBucketInServer(tb2, 0); + assertThat(context.isReplicaOnline(0, tb2)).isTrue(); + assertThat(context.offlineReplicasOnLiveTabletServers()).isEmpty(); + } + // ---- Pending Leader Activation Tracking Tests ---- @Test @@ -215,4 +251,12 @@ private TableInfo createTableInfo(long tableId, TablePath tablePath, boolean isL System.currentTimeMillis(), System.currentTimeMillis()); } + + private ServerInfo createTabletServer(int serverId) { + return new ServerInfo( + serverId, + "RACK" + serverId, + Endpoint.fromListenersString("CLIENT://host" + serverId + ":9124"), + ServerType.TABLET_SERVER); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 001a90f74e..bf8364d393 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -46,12 +46,16 @@ import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; +import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.ApiKeys; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.coordinator.event.AccessContextEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; +import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent; +import org.apache.fluss.server.coordinator.event.RetryOfflineLeaderEvent; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; import org.apache.fluss.server.coordinator.statemachine.BucketState; @@ -59,6 +63,7 @@ import org.apache.fluss.server.entity.AdjustIsrResultForBucket; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.CommitRemoteLogManifestData; +import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore; @@ -109,6 +114,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -1112,6 +1118,228 @@ void testProcessAdjustIsr() throws Exception { assertThat(resultForBucketMap.values()).allMatch(AdjustIsrResultForBucket::succeeded); } + @Test + void testDiskWriteLockedNotifyLeaderResponseMarksReplicaOffline() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = + TablePath.of(defaultDatabase, "disk_write_locked_notify_leader_response"); + int nBuckets = 3; + int replicationFactor = 3; + TableAssignment tableAssignment = + generateAssignment( + nBuckets, + replicationFactor, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + long tableId = + metadataManager.createTable( + tablePath, remoteDataDir, TEST_TABLE, tableAssignment, false); + verifyTableCreated(tableId, tableAssignment, nBuckets, replicationFactor); + + TableBucket tableBucket = new TableBucket(tableId, 0); + int leader = + tableAssignment.getBucketAssignment(tableBucket.getBucket()).getReplicas().get(0); + TableBucketReplica tableBucketReplica = new TableBucketReplica(tableBucket, leader); + + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tableBucket, + new ApiError( + Errors.DISK_WRITE_LOCKED, + "disk write locked"))), + leader)); + + fromCtx( + ctx -> { + assertThat(ctx.getReplicaState(tableBucketReplica)).isEqualTo(OfflineReplica); + assertThat(ctx.isReplicaOnline(leader, tableBucket)).isFalse(); + return null; + }); + } + + @Test + void testRetryOfflineLeaderEventRetriesOfflineReplicaOnLiveServer() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "retry_offline_leader_on_live_server"); + int nBuckets = 3; + int replicationFactor = 1; + TableAssignment tableAssignment = + generateAssignment( + nBuckets, + replicationFactor, + new TabletServerInfo[] {new TabletServerInfo(0, "rack0")}); + long tableId = + metadataManager.createTable( + tablePath, + remoteDataDir, + TEST_TABLE.withReplicationFactor(replicationFactor), + tableAssignment, + false); + verifyTableCreated(tableId, tableAssignment, nBuckets, replicationFactor); + + TableBucket tableBucket = new TableBucket(tableId, 0); + int leader = + tableAssignment.getBucketAssignment(tableBucket.getBucket()).getReplicas().get(0); + TableBucketReplica tableBucketReplica = new TableBucketReplica(tableBucket, leader); + + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tableBucket, + new ApiError( + Errors.DISK_WRITE_LOCKED, + "disk write locked"))), + leader)); + + retryVerifyContext( + ctx -> { + assertThat(ctx.getReplicaState(tableBucketReplica)).isEqualTo(OfflineReplica); + assertThat(ctx.getBucketState(tableBucket)).isEqualTo(OfflineBucket); + assertThat(ctx.isReplicaOnline(leader, tableBucket)).isFalse(); + }); + + eventProcessor.getCoordinatorEventManager().put(new RetryOfflineLeaderEvent()); + + retryVerifyContext( + ctx -> { + assertThat(ctx.isReplicaOnline(leader, tableBucket)).isTrue(); + assertThat(ctx.getReplicaState(tableBucketReplica)).isEqualTo(OnlineReplica); + assertThat(ctx.getBucketState(tableBucket)).isEqualTo(OnlineBucket); + assertThat(ctx.getBucketLeaderAndIsr(tableBucket).get().leader()) + .isEqualTo(leader); + }); + } + + @Test + void testRetryOfflineLeaderEventKeepsReplicaOfflineWhenRetryStillFails() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "retry_offline_leader_still_fails"); + int nBuckets = 3; + int replicationFactor = 1; + TableAssignment tableAssignment = + generateAssignment( + nBuckets, + replicationFactor, + new TabletServerInfo[] {new TabletServerInfo(0, "rack0")}); + long tableId = + metadataManager.createTable( + tablePath, + remoteDataDir, + TEST_TABLE.withReplicationFactor(replicationFactor), + tableAssignment, + false); + verifyTableCreated(tableId, tableAssignment, nBuckets, replicationFactor); + + TableBucket tableBucket = new TableBucket(tableId, 0); + int leader = + tableAssignment.getBucketAssignment(tableBucket.getBucket()).getReplicas().get(0); + TableBucketReplica tableBucketReplica = new TableBucketReplica(tableBucket, leader); + + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tableBucket, + new ApiError( + Errors.DISK_WRITE_LOCKED, + "disk write locked"))), + leader)); + + retryVerifyContext( + ctx -> { + assertThat(ctx.getReplicaState(tableBucketReplica)).isEqualTo(OfflineReplica); + assertThat(ctx.isReplicaOnline(leader, tableBucket)).isFalse(); + }); + + CountingFailingNotifyGateway failingGateway = new CountingFailingNotifyGateway(); + testCoordinatorChannelManager.setGateways(Collections.singletonMap(leader, failingGateway)); + + eventProcessor.getCoordinatorEventManager().put(new RetryOfflineLeaderEvent()); + + retry( + Duration.ofMinutes(1), + () -> assertThat(failingGateway.getNotifyLeaderAndIsrCount()).isGreaterThan(0)); + retryVerifyContext( + ctx -> { + assertThat(ctx.getReplicaState(tableBucketReplica)).isEqualTo(OfflineReplica); + assertThat(ctx.getBucketState(tableBucket)).isEqualTo(OfflineBucket); + assertThat(ctx.isReplicaOnline(leader, tableBucket)).isFalse(); + }); + } + + @Test + void testRetryOfflineLeaderEventSkipsDeletedBucket() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "retry_offline_leader_deleted_bucket"); + int nBuckets = 3; + int replicationFactor = 1; + TableAssignment tableAssignment = + generateAssignment( + nBuckets, + replicationFactor, + new TabletServerInfo[] {new TabletServerInfo(0, "rack0")}); + long tableId = + metadataManager.createTable( + tablePath, + remoteDataDir, + TEST_TABLE.withReplicationFactor(replicationFactor), + tableAssignment, + false); + verifyTableCreated(tableId, tableAssignment, nBuckets, replicationFactor); + + TableBucket tableBucket = new TableBucket(tableId, 0); + int leader = + tableAssignment.getBucketAssignment(tableBucket.getBucket()).getReplicas().get(0); + TableBucketReplica tableBucketReplica = new TableBucketReplica(tableBucket, leader); + + eventProcessor + .getCoordinatorEventManager() + .put( + new NotifyLeaderAndIsrResponseReceivedEvent( + Collections.singletonList( + new NotifyLeaderAndIsrResultForBucket( + tableBucket, + new ApiError( + Errors.DISK_WRITE_LOCKED, + "disk write locked"))), + leader)); + + retryVerifyContext( + ctx -> { + assertThat(ctx.getReplicaState(tableBucketReplica)).isEqualTo(OfflineReplica); + assertThat(ctx.isReplicaOnline(leader, tableBucket)).isFalse(); + }); + + fromCtx( + ctx -> { + ctx.queueTableDeletion(Collections.singleton(tableId)); + return null; + }); + + eventProcessor.getCoordinatorEventManager().put(new RetryOfflineLeaderEvent()); + + fromCtx( + ctx -> { + assertThat(ctx.getReplicaState(tableBucketReplica)).isEqualTo(OfflineReplica); + assertThat(ctx.isReplicaOnline(leader, tableBucket)).isFalse(); + assertThat(ctx.offlineReplicasOnLiveTabletServers()) + .contains(tableBucketReplica); + return null; + }); + } + @Test void testSchemaChange() throws Exception { // make sure all request to gateway should be successful @@ -1604,6 +1832,7 @@ private void verifyIsr(TableBucket tb, int expectedLeader, List expecte private CoordinatorEventProcessor buildCoordinatorEventProcessor() { Configuration conf = new Configuration(); conf.set(ConfigOptions.REMOTE_DATA_DIR, remoteDataDir); + conf.set(ConfigOptions.COORDINATOR_OFFLINE_LEADER_RETRY_INTERVAL, Duration.ofDays(1)); return new CoordinatorEventProcessor( zookeeperClient, serverMetadataCache, @@ -2001,6 +2230,25 @@ private int countInProgressRebalanceTasks(TableBucket... buckets) { return count; } + private static class CountingFailingNotifyGateway extends TestTabletServerGateway { + private final AtomicInteger notifyLeaderAndIsrCount = new AtomicInteger(); + + CountingFailingNotifyGateway() { + super(true, Collections.emptySet()); + } + + int getNotifyLeaderAndIsrCount() { + return notifyLeaderAndIsrCount.get(); + } + + @Override + public CompletableFuture notifyLeaderAndIsr( + NotifyLeaderAndIsrRequest request) { + notifyLeaderAndIsrCount.incrementAndGet(); + return super.notifyLeaderAndIsr(request); + } + } + /** * A gateway that intercepts NotifyLeaderAndIsr calls for verifying sequential execution of * leader migrations. In pass-through mode, it delegates to the parent. In controlled mode, it diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index ff6b0246ee..6dd9ce7fe5 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -555,6 +555,64 @@ tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), replicaManager.getDiskUsageMonitor().update(0.10); } + @Test + void testNewKvLeaderRejectedWhenDiskLocked() throws Exception { + TableBucket kvTb = new TableBucket(DATA1_TABLE_ID_PK, 1); + TableBucket logTb = new TableBucket(DATA1_TABLE_ID, 1); + + replicaManager.getDiskUsageMonitor().update(0.99); + assertThat(replicaManager.isDiskWriteLocked()).isTrue(); + + CompletableFuture> future = + new CompletableFuture<>(); + replicaManager.becomeLeaderOrFollower( + INITIAL_COORDINATOR_EPOCH, + Collections.singletonList( + new NotifyLeaderAndIsrData( + PhysicalTablePath.of(DATA1_TABLE_PATH_PK), + kvTb, + Collections.singletonList(TABLET_SERVER_ID), + new LeaderAndIsr( + TABLET_SERVER_ID, + INITIAL_LEADER_EPOCH, + Collections.singletonList(TABLET_SERVER_ID), + Collections.emptyList(), + INITIAL_COORDINATOR_EPOCH, + INITIAL_BUCKET_EPOCH))), + future::complete); + + List results = future.get(); + assertThat(results).hasSize(1); + NotifyLeaderAndIsrResultForBucket result = results.get(0); + assertThat(result.getError().error()).isEqualTo(Errors.DISK_WRITE_LOCKED); + assertThat(result.getError().messageWithFallback()).contains("data disk usage"); + Replica kvReplica = replicaManager.getReplicaOrException(kvTb); + assertThat(kvReplica.isLeader()).isFalse(); + assertThat(kvReplica.getKvTablet()).isNull(); + + future = new CompletableFuture<>(); + replicaManager.becomeLeaderOrFollower( + INITIAL_COORDINATOR_EPOCH, + Collections.singletonList( + new NotifyLeaderAndIsrData( + PhysicalTablePath.of(DATA1_TABLE_PATH), + logTb, + Collections.singletonList(TABLET_SERVER_ID), + new LeaderAndIsr( + TABLET_SERVER_ID, + INITIAL_LEADER_EPOCH, + Collections.singletonList(TABLET_SERVER_ID), + Collections.emptyList(), + INITIAL_COORDINATOR_EPOCH, + INITIAL_BUCKET_EPOCH))), + future::complete); + + assertThat(future.get()).containsOnly(new NotifyLeaderAndIsrResultForBucket(logTb)); + assertThat(replicaManager.getReplicaOrException(logTb).isLeader()).isTrue(); + + replicaManager.getDiskUsageMonitor().update(0.10); + } + @Test void testPutKv() throws Exception { TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1); diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 0a4223074d..590e461493 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -63,6 +63,7 @@ during the Fluss cluster working. | coordinator.producer-offsets.cleanup-interval | Duration | 1h | The interval for cleaning up expired producer offsets and orphan files in remote storage. The cleanup task runs periodically to remove expired offsets and any orphan files that may have been left behind due to incomplete operations. The default value is 1 hour. | | coordinator.lifecycle-throttler.inflight-timeout | Duration | 3min | The timeout for an in-flight drop event in the coordinator's TableLifecycleThrottler. If a drop event has been admitted but the corresponding completion callback has not arrived within this timeout, the throttler abandons tracking of that drop and continues admitting the next pending drop. | | coordinator.lifecycle-throttler.timeout-check-interval | Duration | 1min | The periodic interval at which the coordinator's TableLifecycleThrottler scans in-flight drops for timeouts. | +| coordinator.offline-leader.retry-interval | Duration | 1min | The interval at which the coordinator retries offline leaders on live tablet servers. This lets a leader that was rejected because of temporary tablet-server conditions, such as disk write protection, become electable again after recovery. | ## TabletServer