Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -125,14 +127,50 @@ public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
stopAction.call();
}
procExecutor.join();
procExecutor.getScheduler().clear();
// HBASE-29555: Callbacks that wake up a procedure after an async operation (such as updating
// meta) run on the asyncTaskExecutor, not on a PEWorker thread. ProcedureExecutor.stop() shuts
// the asyncTaskExecutor down, but ProcedureExecutor.join() only waits a bounded amount of time
// for it to terminate before giving up. If such a wake up task is still pending, it can call
// scheduler.addFront() after we clear the scheduler below and after the executor has been
// restarted, leaving the scheduler queue non-empty and tripping the Preconditions check in
// ProcedureExecutor.load(). Since we reuse the same executor (and scheduler) instance across
// this simulated restart, wait for the already shutdown asyncTaskExecutor to fully terminate so
// any pending wake up task has finished before we clear the scheduler.
awaitAsyncTaskExecutorTermination(procExecutor);

// nothing running...

// re-start
LOG.info("RESTART - Start");
procStore.start(storeThreads);
procExecutor.init(execThreads, failOnCorrupted);
// HBASE-29555: External events (for example a region server reporting a region state transition
// over RPC) can wake a procedure and push it back into the scheduler in the small window
// between clearing the scheduler and ProcedureExecutor.load() checking that it is empty. Those
// producers run on threads we do not own here (RPC handlers etc.), so we cannot reliably stop
// them from this thread. Instead, reload in a retry loop: if load() fails only because the
// scheduler is not empty, stop/drain/clear again and retry. ProcedureExecutor.stop() is
// explicitly safe to call after a failed init(), so this is a clean redo of the reload.
final int maxRestartAttempts = 20;
for (int attempt = 1;; attempt++) {
procExecutor.getScheduler().clear();
procStore.start(storeThreads);
try {
procExecutor.init(execThreads, failOnCorrupted);
break;
} catch (IllegalArgumentException e) {
if (
attempt >= maxRestartAttempts || e.getMessage() == null
|| !e.getMessage().contains("scheduler queue not empty")
) {
throw e;
}
LOG.warn("RESTART - scheduler was repopulated during reload (attempt {}/{}), retrying",
attempt, maxRestartAttempts, e);
procExecutor.stop();
procStore.stop(abort);
procExecutor.join();
awaitAsyncTaskExecutorTermination(procExecutor);
}
}
if (actionBeforeStartWorker != null) {
actionBeforeStartWorker.call();
}
Expand All @@ -147,6 +185,32 @@ public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
}
}

/**
* Wait for the (already shutdown) asyncTaskExecutor of the given executor to fully terminate, so
* that any pending callback (e.g. one that would wake up a procedure by adding it back to the
* scheduler) has finished running. See HBASE-29555 and the caller in
* {@link #restart(ProcedureExecutor, boolean, boolean, Callable, Callable, Callable, boolean, boolean)}.
*/
private static void awaitAsyncTaskExecutorTermination(ProcedureExecutor<?> procExecutor) {
ExecutorService asyncTaskExecutor = procExecutor.getAsyncTaskExecutor();
if (asyncTaskExecutor == null) {
return;
}
long deadline = System.currentTimeMillis() + 60000;
try {
while (!asyncTaskExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
if (System.currentTimeMillis() > deadline) {
LOG.warn("asyncTaskExecutor did not terminate in time while restarting;"
+ " there may still be pending async tasks");
return;
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for asyncTaskExecutor to terminate while restarting", e);
Thread.currentThread().interrupt();
}
}

public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
throws Exception {
storeRestart(procStore, false, loader);
Expand Down