Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ CompletableFuture<Integer> submitCommit(
final WriteInfo info = writeInfos.get(index);
if (info == null) {
return JavaUtils.completeExceptionally(
new IOException(name.get() + " is already committed."));
new IOException(name.get() + " not found."));
}

final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@ private static <OUTPUT, THROWABLE extends Throwable> OUTPUT watchImpl(

public long write(String path, long offset, boolean close, ByteBuffer buffer, boolean sync)
throws IOException {
LOG.trace("write {}, offset={}, length={}, close? {}", path, offset, buffer.remaining(), close);
try {
return writeImpl(path, offset, close, buffer, sync);
} catch (IOException e) {
LOG.error("Failed to write {}, offset {}, close? {}, sync? {}", path, offset, close, sync, e);
throw e;
}
}

private long writeImpl(String path, long offset, boolean close, ByteBuffer buffer, boolean sync)
throws IOException {
final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining());
buffer.limit(chunkSize);
final ByteString reply = writeImpl(this::send, path, offset, close, buffer, sync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachineStorage;
Expand All @@ -44,8 +45,23 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

public class FileStoreStateMachine extends BaseStateMachine {
static class SimulateFailure {
static final String PATH_TO_FAIL = "path-to-fail";
private static final AtomicReference<RaftPeerId> CHOSEN = new AtomicReference<>();

static boolean chooseServer(RaftPeerId serverId) {
if (SimulateFailure.CHOSEN.compareAndSet(null, serverId)) {
LOG.info("Server {} is chosen", serverId);
return true;
} else {
return serverId.equals(SimulateFailure.CHOSEN.get());
}
}
}

private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();

private final FileStore files;
Expand Down Expand Up @@ -120,6 +136,15 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftProtos.RaftP
.build();
}

void simulateFailure(String path) throws Exception {
final RaftServer.Division division = getServer().get().getDivision(getGroupId());
if (path.equals(SimulateFailure.PATH_TO_FAIL)
&& division.getInfo().isFollower()
&& SimulateFailure.chooseServer(division.getId())) {
throw new IOException(getId() + ": Simulated failure for path " + path);
}
}

@Override
public CompletableFuture<Integer> write(LogEntryProto entry, TransactionContext context) {
final FileStoreRequestProto proto = getProto(context, entry);
Expand All @@ -128,6 +153,12 @@ public CompletableFuture<Integer> write(LogEntryProto entry, TransactionContext
}

final WriteRequestHeaderProto h = proto.getWriteHeader();
try {
simulateFailure(h.getPath().toStringUtf8());
} catch (Exception e) {
return FileStoreCommon.completeExceptionally(entry.getIndex(), getId() + ": Failed simulateFailure", e);
}

final CompletableFuture<Integer> f = files.write(entry.getIndex(),
h.getPath().toStringUtf8(), h.getClose(), h.getSync(), h.getOffset(),
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.ratis.examples.filestore.FileStoreStateMachine.SimulateFailure.PATH_TO_FAIL;

public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
Expand Down Expand Up @@ -144,6 +146,18 @@ public void testFileStore() throws Exception {
cluster.shutdown();
}

@Test
public void testWriteStateMachineDataFailure() throws Exception {
final CLUSTER cluster = newCluster(NUM_PEERS);
cluster.start();
RaftTestUtil.waitForLeader(cluster);

final CheckedSupplier<FileStoreClient, IOException> newClient = () -> newFileStoreClient(cluster);
testSingleFile(PATH_TO_FAIL, SizeInBytes.valueOf("100k"), newClient);
testSingleFile("bar", SizeInBytes.valueOf("2k"), newClient);
cluster.shutdown();
}

private static FileStoreWriter writeSingleFile(
String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient, IOException> newClient)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ FileStoreWriter write(boolean sync) throws IOException {

final ByteBuffer b = randomBytes(length, r);

LOG.trace("write {}, offset={}, length={}, close? {}",
fileName, offset, length, close);
final long written = client.write(fileName, offset, close, b, sync);
Assertions.assertEquals(length, written);
offset += length;
Expand Down Expand Up @@ -221,6 +219,8 @@ FileStoreWriter verify() throws IOException {
verify(read, offset, n, expected);
offset += n;
}

LOG.info("Verify successful: {}", fileName);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,31 +129,21 @@ boolean isError() {
}
}

static class ReplyState {
class ReplyState {
private boolean firstReplyReceived = false;
private int errorCount = 0;

synchronized boolean isFirstReplyReceived() {
return firstReplyReceived;
}

synchronized int getErrorCount() {
return errorCount;
}

int process(AppendResult result) {
return process(result == AppendResult.INCONSISTENCY? Event.APPEND_ENTRIES_INCONSISTENCY_REPLY
: Event.APPEND_ENTRIES_REPLY);
}

synchronized int process(Event event) {
firstReplyReceived = event.updateFirstReplyReceived(firstReplyReceived);
if (event.isError()) {
errorCount++;
} else {
errorCount = 0;
}
return errorCount;
return getFollower().getErrorState().updateErrorCount(event.isError());
}
}

Expand Down Expand Up @@ -300,17 +290,25 @@ private boolean isSlowFollower() {
private void mayWait() {
// use lastSend time instead of lastResponse time
try {
getEventAwaitForSignal().await(getWaitTimeMs() + errorWaitTimeMs(),
TimeUnit.MILLISECONDS);
// sleepForErrors won't be waked up by signal
sleepForErrors();
// await can be waked up by signal
getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("{} is interrupted: {}", this, ie.toString());
}
}

private long errorWaitTimeMs() {
return errorRetryWaitPolicy.handleAttemptFailure(replyState::getErrorCount)
.getSleepTime().toLong(TimeUnit.MILLISECONDS);
private void sleepForErrors() throws InterruptedException {
final int errorCount = getFollower().getErrorState().getErrorCountToDelay();
if (errorCount < 1) {
return;
}

final TimeDuration sleepTime = errorRetryWaitPolicy.handleAttemptFailure(() -> errorCount).getSleepTime();
LOG.debug("{}: sleepForErrors {}, errorCount={}", this, sleepTime, errorCount);
sleepTime.sleep();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,34 @@ public interface FollowerInfo {

/** Update lastRpcResponseTime and LastRespondedAppendEntriesSendTime */
void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime);

/** @return the error state. */
ErrorState getErrorState();

/** Error state such as the count for consecutive errors. */
interface ErrorState {
/**
* If it is an error, increment the count; otherwise, reset the count to 0.
*
* @return the updated error count.
*/
int updateErrorCount(boolean isError);

/**
* Each error count is returned only once.
* For the subsequent calls of the same error count, it returns 0.
* <p>
* For example,
* 1. Error count is 3
* 2. Calling getErrorCountToDelay() returns 3
* 3. Calling getErrorCountToDelay() again returns 0
* 4. Calling updateErrorCount(true) to increment error count to 4
* 5. Calling getErrorCountToDelay() returns 4
* 6. Calling getErrorCountToDelay() again returns 0
* 7. Calling updateErrorCount(false) resets error count to 0
*
* @return each error count only once.
*/
int getErrorCountToDelay();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class FollowerInfoImpl implements FollowerInfo {
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX);
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
private final ErrorState errorState = new ErrorStateImpl();
private volatile boolean caughtUp;
private volatile boolean ackInstallSnapshotAttempt = false;

Expand Down Expand Up @@ -240,4 +241,35 @@ public Timestamp getLastRespondedAppendEntriesSendTime() {
public void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime) {
lastRespondedAppendEntriesSendTime.set(sendTime);
}

@Override
public ErrorState getErrorState() {
return errorState;
}

static class ErrorStateImpl implements ErrorState {
/** The number of consecutive errors without getting a successful reply for a particular follower. */
private int errorCount = 0;
private int lastReturnedErrorCount = 0;

@Override
public synchronized int updateErrorCount(boolean isError) {
if (isError) {
errorCount++;
} else {
errorCount = 0;
lastReturnedErrorCount = 0;
}
return errorCount;
}

@Override
public synchronized int getErrorCountToDelay() {
if (errorCount == lastReturnedErrorCount) {
return 0;
}
lastReturnedErrorCount = errorCount;
return errorCount;
}
}
}