Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftGroupMemberI
return toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(), replyId, requestorId.getGroupId());
}

/** For client requests. */
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(ClientId clientId, RaftGroupMemberId serverId) {
return toRaftRpcRequestProtoBuilder(clientId.toByteString(), serverId.getPeerId(), serverId.getGroupId());
}

/** For client requests. */
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftClientRequest request) {
final RaftRpcRequestProto.Builder b = toRaftRpcRequestProtoBuilder(
Expand Down Expand Up @@ -208,13 +213,9 @@ static ByteBuffer toRaftClientRequestProtoByteBuffer(RaftClientRequest request)
}

static RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request) {
return toRaftClientRequestProto(request, true);
}

static RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request, boolean withMsg) {
final RaftClientRequestProto.Builder b = RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(request));
if (withMsg && request.getMessage() != null) {
if (request.getMessage() != null) {
b.setMessage(toClientMessageEntryProtoBuilder(request.getMessage()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto;
import org.apache.ratis.protocol.AdminAsynchronousProtocol;
import org.apache.ratis.protocol.AdminProtocol;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
import org.apache.ratis.protocol.RaftClientProtocol;
import org.apache.ratis.protocol.RaftGroup;
Expand Down Expand Up @@ -102,6 +106,21 @@ default RaftGroup getGroup() {
/** @return the {@link StateMachine} for this division. */
StateMachine getStateMachine();

/**
* Execute a local read-only query after applying the configured
* {@link RaftServerConfigKeys.Read.Option} consistency checks.
*
* <p>This API is intended for embedded users that already have a local
* server division and want to avoid serializing application read requests
* and responses through {@link org.apache.ratis.protocol.Message}. Remote
* clients should continue to use
* {@link org.apache.ratis.client.api.AsyncApi#sendReadOnly(org.apache.ratis.protocol.Message)}.
*/
default <T> CompletableFuture<T> readOnlyAsync(ClientId clientId, ReadRequestTypeProto readRequestType,
Supplier<CompletableFuture<T>> query) throws IOException {
throw new UnsupportedOperationException("readOnlyAsync is not supported");
}

/** @return the raft log of this division. */
RaftLog getRaftLog();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
Expand Down Expand Up @@ -1102,7 +1104,12 @@ private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync(RaftClientRequ
if (leaderId == null) {
return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
}
final ReadIndexRequestProto request = toReadIndexRequestProto(clientRequest, getMemberId(), leaderId);
return sendReadIndexAsync(clientRequest.getClientId(), clientRequest.getType().getRead(), leaderId);
}

private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync(
ClientId clientId, ReadRequestTypeProto readRequestType, RaftPeerId leaderId) {
final ReadIndexRequestProto request = toReadIndexRequestProto(clientId, readRequestType, getMemberId(), leaderId);
try {
return getServerRpc().async().readIndexAsync(request);
} catch (IOException e) {
Expand All @@ -1114,6 +1121,68 @@ private CompletableFuture<Long> getReadIndex(RaftClientRequest request, LeaderSt
return writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex);
}

private CompletableFuture<Long> getReadIndex(CompletableFuture<ReadIndexReplyProto> readIndexReply) {
return readIndexReply.thenApply(reply -> {
if (reply.getServerReply().getSuccess()) {
return reply.getReadIndex();
} else {
throw new CompletionException(new ReadIndexException(getId()
+ ": Failed to get read index from the leader: " + reply));
}
});
}

private CompletableFuture<Long> getReadIndexForReadOnly(ClientId clientId, ReadRequestTypeProto readRequestType) {
final LeaderStateImpl leader = role.getLeaderState().orElse(null);
if (leader != null) {
return leader.getReadIndex(null);
}

final RaftPeerId leaderId = getInfo().getLeaderId();
if (leaderId == null) {
return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
}

return getReadIndex(sendReadIndexAsync(clientId, readRequestType, leaderId));
}

private <T> CompletableFuture<T> checkLeaderStateForReadOnly() {
if (!getInfo().isLeader()) {
return JavaUtils.completeExceptionally(generateNotLeaderException());
}
if (!getInfo().isLeaderReady()) {
return JavaUtils.completeExceptionally(new LeaderNotReadyException(getMemberId()));
}
return null;
}

private static <T> CompletableFuture<T> supplyReadOnly(Supplier<CompletableFuture<T>> query) {
try {
return Objects.requireNonNull(query.get(), "query returned null");
} catch (Throwable t) {
return JavaUtils.completeExceptionally(t);
}
}

@Override
public <T> CompletableFuture<T> readOnlyAsync(ClientId clientId, ReadRequestTypeProto readRequestType,
Supplier<CompletableFuture<T>> query) throws IOException {
Objects.requireNonNull(clientId, "clientId == null");
Objects.requireNonNull(readRequestType, "readRequestType == null");
Objects.requireNonNull(query, "query == null");
assertLifeCycleState(LifeCycle.States.RUNNING);
if (readRequestType.getPreferNonLinearizable() || readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
final CompletableFuture<T> reply = checkLeaderStateForReadOnly();
return reply != null ? reply : supplyReadOnly(query);
} else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
return getReadIndexForReadOnly(clientId, readRequestType)
.thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
.thenCompose(readIndex -> supplyReadOnly(query));
} else {
throw new IllegalStateException("Unexpected read option: " + readOption);
}
}

private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request) {
if (request.getType().getRead().getPreferNonLinearizable()
|| readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
Expand All @@ -1135,14 +1204,7 @@ private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request)
if (leader != null) {
replyFuture = getReadIndex(request, leader);
} else {
replyFuture = sendReadIndexAsync(request).thenApply(reply -> {
if (reply.getServerReply().getSuccess()) {
return reply.getReadIndex();
} else {
throw new CompletionException(new ReadIndexException(getId() +
": Failed to get read index from the leader: " + reply));
}
});
replyFuture = getReadIndex(sendReadIndexAsync(request));
}

return replyFuture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
Expand Down Expand Up @@ -115,10 +115,13 @@ static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
}

static ReadIndexRequestProto toReadIndexRequestProto(
RaftClientRequest clientRequest, RaftGroupMemberId requestorId, RaftPeerId replyId) {
ClientId clientId, ReadRequestTypeProto readRequestType, RaftGroupMemberId serverId, RaftPeerId leaderId) {
final RaftClientRequestProto.Builder clientRequestBuilder = RaftClientRequestProto.newBuilder()
.setRpcRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(clientId, serverId))
.setRead(readRequestType);
return ReadIndexRequestProto.newBuilder()
.setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId))
.setClientRequest(ClientProtoUtils.toRaftClientRequestProto(clientRequest, false))
.setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(serverId, leaderId))
.setClientRequest(clientRequestBuilder)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,21 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine;
import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT;
import static org.apache.ratis.ReadOnlyRequestTests.QUERY;
import static org.apache.ratis.ReadOnlyRequestTests.WAIT_AND_INCREMENT;
import static org.apache.ratis.ReadOnlyRequestTests.assertLongAtLeast;
import static org.apache.ratis.ReadOnlyRequestTests.assertOption;
import static org.apache.ratis.ReadOnlyRequestTests.assertReplyAtLeast;
import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact;
import static org.apache.ratis.ReadOnlyRequestTests.getCount;
import static org.apache.ratis.ReadOnlyRequestTests.readOnlyAsync;
import static org.apache.ratis.ReadOnlyRequestTests.readOnlyAsyncPreferNonLinearizable;
import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
Expand Down Expand Up @@ -150,6 +156,7 @@ static <C extends MiniRaftCluster> void runTestFollowerLinearizableRead(C cluste
final int n = 100;
final List<Reply> f0Replies = new ArrayList<>(n);
final List<Reply> f1Replies = new ArrayList<>(n);
final List<CompletableFuture<Long>> f0LocalReplies = new ArrayList<>(n);
try (RaftClient client = cluster.createClient(leaderId);
RaftClient c0 = cluster.createClient(f0);
RaftClient c1 = cluster.createClient(f1);
Expand All @@ -160,12 +167,24 @@ static <C extends MiniRaftCluster> void runTestFollowerLinearizableRead(C cluste

f0Replies.add(new Reply(count, c0.async().sendReadOnly(QUERY, f0)));
f1Replies.add(new Reply(count, c1.async().sendReadOnly(QUERY, f1)));
f0LocalReplies.add(readOnlyAsync(
followers.get(0), () -> CompletableFuture.completedFuture(getCount(followers.get(0)))));
}

for (int i = 0; i < n; i++) {
f0Replies.get(i).assertAtLeast();
f1Replies.get(i).assertAtLeast();
assertLongAtLeast(i + 1, f0LocalReplies.get(i).join());
}

final AtomicBoolean callbackInvoked = new AtomicBoolean();
final CompletableFuture<Long> preferNonLinearizable = readOnlyAsyncPreferNonLinearizable(
followers.get(0), () -> {
callbackInvoked.set(true);
return CompletableFuture.completedFuture(getCount(followers.get(0)));
});
Assertions.assertThrows(CompletionException.class, preferNonLinearizable::join);
Assertions.assertFalse(callbackInvoked.get());
}
}

Expand Down Expand Up @@ -285,4 +304,4 @@ static <C extends MiniRaftCluster> void runTestReadAfterWrite(C cluster) throws
assertReplyAtLeast(2, asyncReply.join());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
Expand All @@ -40,9 +42,13 @@
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
Expand Down Expand Up @@ -81,15 +87,28 @@ public void testReadOnly() throws Exception {

static <C extends MiniRaftCluster> void runTestReadOnly(C cluster) throws Exception {
try {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();

try (final RaftClient client = cluster.createClient(leaderId)) {
for (int i = 1; i <= 10; i++) {
assertReplyExact(i, client.io().send(INCREMENT));
assertReplyExact(i, client.io().sendReadOnly(QUERY));
Assertions.assertEquals(i, readOnlyAsync(
leader, () -> CompletableFuture.completedFuture(getCount(leader))).get());
}
}

if (RaftServerConfigKeys.Read.option(cluster.getProperties()) == RaftServerConfigKeys.Read.Option.DEFAULT) {
final RaftServer.Division follower = cluster.getFollowers().get(0);
final AtomicBoolean callbackInvoked = new AtomicBoolean();
final CompletableFuture<Long> read = readOnlyAsync(follower, () -> {
callbackInvoked.set(true);
return CompletableFuture.completedFuture(getCount(follower));
});
Assertions.assertThrows(CompletionException.class, read::join);
Assertions.assertFalse(callbackInvoked.get());
}
} finally {
cluster.shutdown();
}
Expand Down Expand Up @@ -144,6 +163,20 @@ static int retrieve(RaftClientReply reply) {
return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
}

static long getCount(RaftServer.Division server) {
return ((CounterStateMachine) server.getStateMachine()).getCount();
}

static <T> CompletableFuture<T> readOnlyAsync(
RaftServer.Division server, Supplier<CompletableFuture<T>> query) throws IOException {
return server.readOnlyAsync(ClientId.emptyClientId(), RaftClientRequest.readRequestType().getRead(), query);
}

static <T> CompletableFuture<T> readOnlyAsyncPreferNonLinearizable(
RaftServer.Division server, Supplier<CompletableFuture<T>> query) throws IOException {
return server.readOnlyAsync(ClientId.emptyClientId(), RaftClientRequest.readRequestType(true).getRead(), query);
}

public static void assertReplyExact(int expectedCount, RaftClientReply reply) {
Assertions.assertTrue(reply.isSuccess());
final int retrieved = retrieve(reply);
Expand All @@ -157,6 +190,11 @@ static void assertReplyAtLeast(int minCount, RaftClientReply reply) {
() -> "retrieved = " + retrieved + " < minCount = " + minCount + ", reply=" + reply);
}

public static void assertLongAtLeast(int minCount, long retrieved) {
Assertions.assertTrue(retrieved >= minCount,
() -> "retrieved = " + retrieved + " < minCount = " + minCount);
}

/**
* CounterStateMachine support 3 operations
* 1. increment
Expand Down