From f7ebd80ac65a0675f9e474d309d0029d1bf1bc5f Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 21 Apr 2026 01:05:48 -0700 Subject: [PATCH] RATIS-2506. Delay consecutive GrpcLogAppender restart. --- .../ratis/examples/filestore/FileInfo.java | 2 +- .../examples/filestore/FileStoreClient.java | 11 +++++++ .../filestore/FileStoreStateMachine.java | 31 ++++++++++++++++++ .../examples/filestore/FileStoreBaseTest.java | 14 ++++++++ .../examples/filestore/FileStoreWriter.java | 4 +-- .../ratis/grpc/server/GrpcLogAppender.java | 32 +++++++++---------- .../ratis/server/leader/FollowerInfo.java | 30 +++++++++++++++++ .../ratis/server/impl/FollowerInfoImpl.java | 32 +++++++++++++++++++ 8 files changed, 136 insertions(+), 20 deletions(-) diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java index c7d8cb7cd1..c3ec21f64b 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java @@ -276,7 +276,7 @@ CompletableFuture submitCommit( final WriteInfo info = writeInfos.get(index); if (info == null) { return JavaUtils.completeExceptionally( - new IOException(name.get() + " is already committed.")); + new IOException(name.get() + " not found.")); } final CheckedSupplier task = LogUtils.newCheckedSupplier(LOG, () -> { diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java index c223b100b3..fb0ee49dc2 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java @@ -171,6 +171,17 @@ private static OUTPUT watchImpl( public long write(String path, long offset, boolean close, ByteBuffer buffer, boolean sync) throws IOException { + LOG.trace("write {}, offset={}, length={}, close? {}", path, offset, buffer.remaining(), close); + try { + return writeImpl(path, offset, close, buffer, sync); + } catch (IOException e) { + LOG.error("Failed to write {}, offset {}, close? {}, sync? {}", path, offset, close, sync, e); + throw e; + } + } + + private long writeImpl(String path, long offset, boolean close, ByteBuffer buffer, boolean sync) + throws IOException { final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining()); buffer.limit(chunkSize); final ByteString reply = writeImpl(this::send, path, offset, close, buffer, sync); diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java index 5f258ee3b7..d9a1463b93 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java @@ -31,6 +31,7 @@ import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.StateMachineStorage; @@ -44,8 +45,23 @@ import java.io.IOException; import java.nio.file.Path; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; public class FileStoreStateMachine extends BaseStateMachine { + static class SimulateFailure { + static final String PATH_TO_FAIL = "path-to-fail"; + private static final AtomicReference CHOSEN = new AtomicReference<>(); + + static boolean chooseServer(RaftPeerId serverId) { + if (SimulateFailure.CHOSEN.compareAndSet(null, serverId)) { + LOG.info("Server {} is chosen", serverId); + return true; + } else { + return serverId.equals(SimulateFailure.CHOSEN.get()); + } + } + } + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final FileStore files; @@ -120,6 +136,15 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftProtos.RaftP .build(); } + void simulateFailure(String path) throws Exception { + final RaftServer.Division division = getServer().get().getDivision(getGroupId()); + if (path.equals(SimulateFailure.PATH_TO_FAIL) + && division.getInfo().isFollower() + && SimulateFailure.chooseServer(division.getId())) { + throw new IOException(getId() + ": Simulated failure for path " + path); + } + } + @Override public CompletableFuture write(LogEntryProto entry, TransactionContext context) { final FileStoreRequestProto proto = getProto(context, entry); @@ -128,6 +153,12 @@ public CompletableFuture write(LogEntryProto entry, TransactionContext } final WriteRequestHeaderProto h = proto.getWriteHeader(); + try { + simulateFailure(h.getPath().toStringUtf8()); + } catch (Exception e) { + return FileStoreCommon.completeExceptionally(entry.getIndex(), getId() + ": Failed simulateFailure", e); + } + final CompletableFuture f = files.write(entry.getIndex(), h.getPath().toStringUtf8(), h.getClose(), h.getSync(), h.getOffset(), entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData()); diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java index 07668e0f2d..e363a0f70b 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java @@ -47,6 +47,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.ratis.examples.filestore.FileStoreStateMachine.SimulateFailure.PATH_TO_FAIL; + public abstract class FileStoreBaseTest extends BaseTest implements MiniRaftCluster.Factory.Get { @@ -144,6 +146,18 @@ public void testFileStore() throws Exception { cluster.shutdown(); } + @Test + public void testWriteStateMachineDataFailure() throws Exception { + final CLUSTER cluster = newCluster(NUM_PEERS); + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + + final CheckedSupplier newClient = () -> newFileStoreClient(cluster); + testSingleFile(PATH_TO_FAIL, SizeInBytes.valueOf("100k"), newClient); + testSingleFile("bar", SizeInBytes.valueOf("2k"), newClient); + cluster.shutdown(); + } + private static FileStoreWriter writeSingleFile( String path, SizeInBytes fileLength, CheckedSupplier newClient) throws Exception { diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java index c0f7d08097..480fe40d1b 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java @@ -126,8 +126,6 @@ FileStoreWriter write(boolean sync) throws IOException { final ByteBuffer b = randomBytes(length, r); - LOG.trace("write {}, offset={}, length={}, close? {}", - fileName, offset, length, close); final long written = client.write(fileName, offset, close, b, sync); Assertions.assertEquals(length, written); offset += length; @@ -221,6 +219,8 @@ FileStoreWriter verify() throws IOException { verify(read, offset, n, expected); offset += n; } + + LOG.info("Verify successful: {}", fileName); return this; } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 053cc5c0f4..29867b544e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -129,18 +129,13 @@ boolean isError() { } } - static class ReplyState { + class ReplyState { private boolean firstReplyReceived = false; - private int errorCount = 0; synchronized boolean isFirstReplyReceived() { return firstReplyReceived; } - synchronized int getErrorCount() { - return errorCount; - } - int process(AppendResult result) { return process(result == AppendResult.INCONSISTENCY? Event.APPEND_ENTRIES_INCONSISTENCY_REPLY : Event.APPEND_ENTRIES_REPLY); @@ -148,12 +143,7 @@ int process(AppendResult result) { synchronized int process(Event event) { firstReplyReceived = event.updateFirstReplyReceived(firstReplyReceived); - if (event.isError()) { - errorCount++; - } else { - errorCount = 0; - } - return errorCount; + return getFollower().getErrorState().updateErrorCount(event.isError()); } } @@ -300,17 +290,25 @@ private boolean isSlowFollower() { private void mayWait() { // use lastSend time instead of lastResponse time try { - getEventAwaitForSignal().await(getWaitTimeMs() + errorWaitTimeMs(), - TimeUnit.MILLISECONDS); + // sleepForErrors won't be waked up by signal + sleepForErrors(); + // await can be waked up by signal + getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); LOG.warn("{} is interrupted: {}", this, ie.toString()); } } - private long errorWaitTimeMs() { - return errorRetryWaitPolicy.handleAttemptFailure(replyState::getErrorCount) - .getSleepTime().toLong(TimeUnit.MILLISECONDS); + private void sleepForErrors() throws InterruptedException { + final int errorCount = getFollower().getErrorState().getErrorCountToDelay(); + if (errorCount < 1) { + return; + } + + final TimeDuration sleepTime = errorRetryWaitPolicy.handleAttemptFailure(() -> errorCount).getSleepTime(); + LOG.debug("{}: sleepForErrors {}, errorCount={}", this, sleepTime, errorCount); + sleepTime.sleep(); } @Override diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java index e563745127..7ab6d7fab3 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java @@ -112,4 +112,34 @@ public interface FollowerInfo { /** Update lastRpcResponseTime and LastRespondedAppendEntriesSendTime */ void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime); + + /** @return the error state. */ + ErrorState getErrorState(); + + /** Error state such as the count for consecutive errors. */ + interface ErrorState { + /** + * If it is an error, increment the count; otherwise, reset the count to 0. + * + * @return the updated error count. + */ + int updateErrorCount(boolean isError); + + /** + * Each error count is returned only once. + * For the subsequent calls of the same error count, it returns 0. + *

+ * For example, + * 1. Error count is 3 + * 2. Calling getErrorCountToDelay() returns 3 + * 3. Calling getErrorCountToDelay() again returns 0 + * 4. Calling updateErrorCount(true) to increment error count to 4 + * 5. Calling getErrorCountToDelay() returns 4 + * 6. Calling getErrorCountToDelay() again returns 0 + * 7. Calling updateErrorCount(false) resets error count to 0 + * + * @return each error count only once. + */ + int getErrorCountToDelay(); + } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java index 7c34c1cb8b..340a12b008 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java @@ -42,6 +42,7 @@ class FollowerInfoImpl implements FollowerInfo { private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX); private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX); private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L); + private final ErrorState errorState = new ErrorStateImpl(); private volatile boolean caughtUp; private volatile boolean ackInstallSnapshotAttempt = false; @@ -240,4 +241,35 @@ public Timestamp getLastRespondedAppendEntriesSendTime() { public void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime) { lastRespondedAppendEntriesSendTime.set(sendTime); } + + @Override + public ErrorState getErrorState() { + return errorState; + } + + static class ErrorStateImpl implements ErrorState { + /** The number of consecutive errors without getting a successful reply for a particular follower. */ + private int errorCount = 0; + private int lastReturnedErrorCount = 0; + + @Override + public synchronized int updateErrorCount(boolean isError) { + if (isError) { + errorCount++; + } else { + errorCount = 0; + lastReturnedErrorCount = 0; + } + return errorCount; + } + + @Override + public synchronized int getErrorCountToDelay() { + if (errorCount == lastReturnedErrorCount) { + return 0; + } + lastReturnedErrorCount = errorCount; + return errorCount; + } + } }