Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ public class ConfigOptions {
+ "TableLifecycleThrottler scans in-flight drops for "
+ "timeouts.");

public static final ConfigOption<Duration> COORDINATOR_OFFLINE_LEADER_RETRY_INTERVAL =
key("coordinator.offline-leader.retry-interval")
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription(
"The interval at which the coordinator retries offline leaders on "
+ "live tablet servers. This lets a leader that was rejected "
+ "because of temporary tablet-server conditions, such as disk "
+ "write protection, become electable again after recovery.");

public static final ConfigOption<Boolean> LOG_TABLE_ALLOW_CREATION =
key("allow.create.log.tables")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,41 @@ public void removeOfflineBucketInServer(int serverId) {
replicasOnOffline.remove(serverId);
}

/** Removes the offline marker for one table bucket on the given tablet server. */
public void removeOfflineBucketInServer(TableBucket tableBucket, int serverId) {
Set<TableBucket> tableBuckets = replicasOnOffline.get(serverId);
if (tableBuckets == null) {
return;
}
tableBuckets.remove(tableBucket);
if (tableBuckets.isEmpty()) {
replicasOnOffline.remove(serverId);
}
}

/**
* Returns offline replicas that are on live tablet servers.
*
* <p>The {@code replicasOnOffline} map is part of {@link #isReplicaOnline(int, TableBucket)},
* so replicas in it are filtered out from leader election even when their tablet server is
* still live. The coordinator periodically probes these replicas again because some failures,
* such as disk write protection, can recover without a tablet-server restart.
*/
public Set<TableBucketReplica> offlineReplicasOnLiveTabletServers() {
Set<Integer> liveTabletServers = liveTabletServerSet();
Set<TableBucketReplica> offlineReplicas = new HashSet<>();
for (Map.Entry<Integer, Set<TableBucket>> entry : replicasOnOffline.entrySet()) {
int serverId = entry.getKey();
if (!liveTabletServers.contains(serverId)) {
continue;
}
for (TableBucket tableBucket : entry.getValue()) {
offlineReplicas.add(new TableBucketReplica(tableBucket, serverId));
}
}
return offlineReplicas;
}

// ---- Pending leader activation tracking (for Cluster Health API) ----

public void addPendingLeaderActivation(TableBucket bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.fluss.server.coordinator.event.RebalanceTaskTimeoutEvent;
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
import org.apache.fluss.server.coordinator.event.ResumeDropEvent;
import org.apache.fluss.server.coordinator.event.RetryOfflineLeaderEvent;
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
import org.apache.fluss.server.coordinator.event.watcher.CoordinatorChangeWatcher;
Expand Down Expand Up @@ -126,6 +127,7 @@
import org.apache.fluss.utils.AutoPartitionStrategy;
import org.apache.fluss.utils.clock.Clock;
import org.apache.fluss.utils.clock.SystemClock;
import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
import org.apache.fluss.utils.types.Tuple2;

import org.slf4j.Logger;
Expand All @@ -146,6 +148,9 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
Expand Down Expand Up @@ -188,6 +193,8 @@ public class CoordinatorEventProcessor implements EventProcessor {
private final String internalListenerName;
private final CoordinatorMetricGroup coordinatorMetricGroup;
private final RebalanceManager rebalanceManager;
private final ScheduledExecutorService offlineLeaderRetryScheduler;
private final long offlineLeaderRetryIntervalMs;
private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
private final LakeTableHelper lakeTableHelper;

Expand Down Expand Up @@ -262,6 +269,18 @@ public CoordinatorEventProcessor(
this.rebalanceManager =
new RebalanceManager(
this, zooKeeperClient, coordinatorEventManager, SystemClock.getInstance());
this.offlineLeaderRetryIntervalMs =
conf.get(ConfigOptions.COORDINATOR_OFFLINE_LEADER_RETRY_INTERVAL).toMillis();
if (offlineLeaderRetryIntervalMs <= 0) {
throw new IllegalArgumentException(
String.format(
"%s must be positive, but was %d ms.",
ConfigOptions.COORDINATOR_OFFLINE_LEADER_RETRY_INTERVAL.key(),
offlineLeaderRetryIntervalMs));
}
this.offlineLeaderRetryScheduler =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory("offline-leader-retry"));
this.ioExecutor = ioExecutor;
this.lakeTableHelper =
new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR));
Expand Down Expand Up @@ -322,9 +341,12 @@ public void startup() {
// start rebalance manager.
rebalanceManager.startup();
rebalanceManager.start();

startOfflineLeaderRetryScheduler();
}

public void shutdown() {
offlineLeaderRetryScheduler.shutdownNow();
// close the event manager
coordinatorEventManager.close();
rebalanceManager.close();
Expand Down Expand Up @@ -587,6 +609,25 @@ private void onShutdown() {
tabletServerChangeWatcher.stop();
}

private void startOfflineLeaderRetryScheduler() {
offlineLeaderRetryScheduler.scheduleWithFixedDelay(
this::enqueueRetryOfflineLeaderEventSafely,
offlineLeaderRetryIntervalMs,
offlineLeaderRetryIntervalMs,
TimeUnit.MILLISECONDS);
LOG.info(
"Offline leader retry scheduler started: retryIntervalMs={}",
offlineLeaderRetryIntervalMs);
}

private void enqueueRetryOfflineLeaderEventSafely() {
try {
coordinatorEventManager.put(new RetryOfflineLeaderEvent());
} catch (Throwable t) {
LOG.warn("Failed to enqueue retry offline leader event.", t);
}
}

@Override
public void process(CoordinatorEvent event) {
if (event instanceof CreateTableEvent) {
Expand All @@ -605,6 +646,8 @@ public void process(CoordinatorEvent event) {
} else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
processNotifyLeaderAndIsrResponseReceivedEvent(
(NotifyLeaderAndIsrResponseReceivedEvent) event);
} else if (event instanceof RetryOfflineLeaderEvent) {
processRetryOfflineLeader();
} else if (event instanceof DeleteReplicaResponseReceivedEvent) {
processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) event);
} else if (event instanceof NewCoordinatorEvent) {
Expand Down Expand Up @@ -702,6 +745,42 @@ public void process(CoordinatorEvent event) {
}
}

private void processRetryOfflineLeader() {
Set<TableBucketReplica> offlineReplicas =
coordinatorContext.offlineReplicasOnLiveTabletServers().stream()
.filter(
replica ->
!coordinatorContext.isToBeDeleted(replica.getTableBucket()))
.filter(
replica ->
coordinatorContext.getReplicaState(replica)
== OfflineReplica)
.collect(Collectors.toSet());

if (offlineReplicas.isEmpty()) {
return;
}

LOG.info(
"Retrying {} offline replicas on live tablet servers before triggering "
+ "offline leader election: {}.",
offlineReplicas.size(),
offlineReplicas);

// replicasOnOffline is checked by CoordinatorContext#isReplicaOnline and therefore also
// by leader election. Remove the marker before probing the live server again; if the
// server still cannot become leader, NotifyLeaderAndIsr will fail and the replica will be
// put back to OfflineReplica. TODO: once standby replicas maintain local KV snapshots,
// distinguish standby promotion from fresh snapshot download when retrying KV leaders.
for (TableBucketReplica offlineReplica : offlineReplicas) {
coordinatorContext.removeOfflineBucketInServer(
offlineReplica.getTableBucket(), offlineReplica.getReplica());
}

replicaStateMachine.handleStateChanges(offlineReplicas, OnlineReplica);
tableBucketStateMachine.triggerOnlineBucketStateChange();
}

private void processCreateTable(CreateTableEvent createTableEvent) {
long tableId = createTableEvent.getTableInfo().getTableId();
// skip the table if it already exists
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.coordinator.event;

/** An event for retrying offline leaders on live tablet servers. */
public class RetryOfflineLeaderEvent implements CoordinatorEvent {}
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,11 @@ protected boolean validateAndGetIsBecomeLeader(NotifyLeaderAndIsrData data) {
if (requestLeaderEpoch >= currentLeaderEpoch) {
if (data.getReplicas().contains(serverId)) {
int leaderId = data.getLeader();
return leaderId == serverId;
boolean becomeLeader = leaderId == serverId;
if (becomeLeader) {
ensureWritableForNewKvLeader(replica, requestLeaderEpoch);
}
return becomeLeader;
} else {
String errorMessage =
String.format(
Expand All @@ -1845,6 +1849,17 @@ protected boolean validateAndGetIsBecomeLeader(NotifyLeaderAndIsrData data) {
}
}

private void ensureWritableForNewKvLeader(Replica replica, int requestLeaderEpoch) {
if (!replica.isKvTable() || requestLeaderEpoch <= replica.getLeaderEpoch()) {
return;
}

// TODO: Once standby replicas maintain local KV snapshots, allow promoting a standby
// replica while disk write protection is active because it should not need to download a
// large remote snapshot during make-leader.
localDiskManager.ensureWritable();
}

/**
* If all the following conditions are true, we need to put a delayed write operation into the
* delayed write manager and wait for replication to complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableBucketReplica;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
Expand Down Expand Up @@ -78,6 +79,41 @@ void testGetLakeTableCount() {
assertThat(context.getLakeTableCount()).isEqualTo(2);
}

@Test
void testOfflineReplicasOnLiveTabletServersOnlyReturnsLiveServers() {
CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH);
TableBucket liveBucket = new TableBucket(1L, 0);
TableBucket deadBucket = new TableBucket(1L, 1);

context.setLiveTabletServers(Arrays.asList(createTabletServer(0), createTabletServer(1)));
context.addOfflineBucketInServer(liveBucket, 0);
context.addOfflineBucketInServer(deadBucket, 2);

assertThat(context.offlineReplicasOnLiveTabletServers())
.containsExactly(new TableBucketReplica(liveBucket, 0));
}

@Test
void testRemoveOfflineBucketInServerForBucket() {
CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH);
TableBucket tb1 = new TableBucket(1L, 0);
TableBucket tb2 = new TableBucket(1L, 1);

context.setLiveTabletServers(Collections.singletonList(createTabletServer(0)));
context.addOfflineBucketInServer(tb1, 0);
context.addOfflineBucketInServer(tb2, 0);

context.removeOfflineBucketInServer(tb1, 0);
assertThat(context.isReplicaOnline(0, tb1)).isTrue();
assertThat(context.isReplicaOnline(0, tb2)).isFalse();
assertThat(context.offlineReplicasOnLiveTabletServers())
.containsExactly(new TableBucketReplica(tb2, 0));

context.removeOfflineBucketInServer(tb2, 0);
assertThat(context.isReplicaOnline(0, tb2)).isTrue();
assertThat(context.offlineReplicasOnLiveTabletServers()).isEmpty();
}

// ---- Pending Leader Activation Tracking Tests ----

@Test
Expand Down Expand Up @@ -215,4 +251,12 @@ private TableInfo createTableInfo(long tableId, TablePath tablePath, boolean isL
System.currentTimeMillis(),
System.currentTimeMillis());
}

private ServerInfo createTabletServer(int serverId) {
return new ServerInfo(
serverId,
"RACK" + serverId,
Endpoint.fromListenersString("CLIENT://host" + serverId + ":9124"),
ServerType.TABLET_SERVER);
}
}
Loading
Loading