diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index d2146a521f..7bb90b74ad 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -121,6 +121,11 @@ static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftGroupMemberI
return toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(), replyId, requestorId.getGroupId());
}
+ /** For client requests. */
+ static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(ClientId clientId, RaftGroupMemberId serverId) {
+ return toRaftRpcRequestProtoBuilder(clientId.toByteString(), serverId.getPeerId(), serverId.getGroupId());
+ }
+
/** For client requests. */
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftClientRequest request) {
final RaftRpcRequestProto.Builder b = toRaftRpcRequestProtoBuilder(
@@ -208,13 +213,9 @@ static ByteBuffer toRaftClientRequestProtoByteBuffer(RaftClientRequest request)
}
static RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request) {
- return toRaftClientRequestProto(request, true);
- }
-
- static RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request, boolean withMsg) {
final RaftClientRequestProto.Builder b = RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(request));
- if (withMsg && request.getMessage() != null) {
+ if (request.getMessage() != null) {
b.setMessage(toClientMessageEntryProtoBuilder(request.getMessage()));
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index 84e3a1ed30..8efe3e5062 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -24,12 +24,16 @@
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto;
import org.apache.ratis.protocol.AdminAsynchronousProtocol;
import org.apache.ratis.protocol.AdminProtocol;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
import org.apache.ratis.protocol.RaftClientProtocol;
import org.apache.ratis.protocol.RaftGroup;
@@ -102,6 +106,21 @@ default RaftGroup getGroup() {
/** @return the {@link StateMachine} for this division. */
StateMachine getStateMachine();
+ /**
+ * Execute a local read-only query after applying the configured
+ * {@link RaftServerConfigKeys.Read.Option} consistency checks.
+ *
+ *
This API is intended for embedded users that already have a local
+ * server division and want to avoid serializing application read requests
+ * and responses through {@link org.apache.ratis.protocol.Message}. Remote
+ * clients should continue to use
+ * {@link org.apache.ratis.client.api.AsyncApi#sendReadOnly(org.apache.ratis.protocol.Message)}.
+ */
+ default CompletableFuture readOnlyAsync(ClientId clientId, ReadRequestTypeProto readRequestType,
+ Supplier> query) throws IOException {
+ throw new UnsupportedOperationException("readOnlyAsync is not supported");
+ }
+
/** @return the raft log of this division. */
RaftLog getRaftLog();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c0e93338a6..04836a2199 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -37,12 +37,14 @@
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
@@ -1102,7 +1104,12 @@ private CompletableFuture sendReadIndexAsync(RaftClientRequ
if (leaderId == null) {
return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
}
- final ReadIndexRequestProto request = toReadIndexRequestProto(clientRequest, getMemberId(), leaderId);
+ return sendReadIndexAsync(clientRequest.getClientId(), clientRequest.getType().getRead(), leaderId);
+ }
+
+ private CompletableFuture sendReadIndexAsync(
+ ClientId clientId, ReadRequestTypeProto readRequestType, RaftPeerId leaderId) {
+ final ReadIndexRequestProto request = toReadIndexRequestProto(clientId, readRequestType, getMemberId(), leaderId);
try {
return getServerRpc().async().readIndexAsync(request);
} catch (IOException e) {
@@ -1114,6 +1121,68 @@ private CompletableFuture getReadIndex(RaftClientRequest request, LeaderSt
return writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex);
}
+ private CompletableFuture getReadIndex(CompletableFuture readIndexReply) {
+ return readIndexReply.thenApply(reply -> {
+ if (reply.getServerReply().getSuccess()) {
+ return reply.getReadIndex();
+ } else {
+ throw new CompletionException(new ReadIndexException(getId()
+ + ": Failed to get read index from the leader: " + reply));
+ }
+ });
+ }
+
+ private CompletableFuture getReadIndexForReadOnly(ClientId clientId, ReadRequestTypeProto readRequestType) {
+ final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+ if (leader != null) {
+ return leader.getReadIndex(null);
+ }
+
+ final RaftPeerId leaderId = getInfo().getLeaderId();
+ if (leaderId == null) {
+ return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
+ }
+
+ return getReadIndex(sendReadIndexAsync(clientId, readRequestType, leaderId));
+ }
+
+ private CompletableFuture checkLeaderStateForReadOnly() {
+ if (!getInfo().isLeader()) {
+ return JavaUtils.completeExceptionally(generateNotLeaderException());
+ }
+ if (!getInfo().isLeaderReady()) {
+ return JavaUtils.completeExceptionally(new LeaderNotReadyException(getMemberId()));
+ }
+ return null;
+ }
+
+ private static CompletableFuture supplyReadOnly(Supplier> query) {
+ try {
+ return Objects.requireNonNull(query.get(), "query returned null");
+ } catch (Throwable t) {
+ return JavaUtils.completeExceptionally(t);
+ }
+ }
+
+ @Override
+ public CompletableFuture readOnlyAsync(ClientId clientId, ReadRequestTypeProto readRequestType,
+ Supplier> query) throws IOException {
+ Objects.requireNonNull(clientId, "clientId == null");
+ Objects.requireNonNull(readRequestType, "readRequestType == null");
+ Objects.requireNonNull(query, "query == null");
+ assertLifeCycleState(LifeCycle.States.RUNNING);
+ if (readRequestType.getPreferNonLinearizable() || readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
+ final CompletableFuture reply = checkLeaderStateForReadOnly();
+ return reply != null ? reply : supplyReadOnly(query);
+ } else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
+ return getReadIndexForReadOnly(clientId, readRequestType)
+ .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
+ .thenCompose(readIndex -> supplyReadOnly(query));
+ } else {
+ throw new IllegalStateException("Unexpected read option: " + readOption);
+ }
+ }
+
private CompletableFuture readAsync(RaftClientRequest request) {
if (request.getType().getRead().getPreferNonLinearizable()
|| readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
@@ -1135,14 +1204,7 @@ private CompletableFuture readAsync(RaftClientRequest request)
if (leader != null) {
replyFuture = getReadIndex(request, leader);
} else {
- replyFuture = sendReadIndexAsync(request).thenApply(reply -> {
- if (reply.getServerReply().getSuccess()) {
- return reply.getReadIndex();
- } else {
- throw new CompletionException(new ReadIndexException(getId() +
- ": Failed to get read index from the leader: " + reply));
- }
- });
+ replyFuture = getReadIndex(sendReadIndexAsync(request));
}
return replyFuture
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 19d4ce6a75..f2fd142774 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -20,7 +20,7 @@
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
-import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -115,10 +115,13 @@ static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
}
static ReadIndexRequestProto toReadIndexRequestProto(
- RaftClientRequest clientRequest, RaftGroupMemberId requestorId, RaftPeerId replyId) {
+ ClientId clientId, ReadRequestTypeProto readRequestType, RaftGroupMemberId serverId, RaftPeerId leaderId) {
+ final RaftClientRequestProto.Builder clientRequestBuilder = RaftClientRequestProto.newBuilder()
+ .setRpcRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(clientId, serverId))
+ .setRead(readRequestType);
return ReadIndexRequestProto.newBuilder()
- .setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId))
- .setClientRequest(ClientProtoUtils.toRaftClientRequestProto(clientRequest, false))
+ .setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(serverId, leaderId))
+ .setClientRequest(clientRequestBuilder)
.build();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
index 09781b546e..47670f4a3e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
@@ -41,15 +41,21 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine;
import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT;
import static org.apache.ratis.ReadOnlyRequestTests.QUERY;
import static org.apache.ratis.ReadOnlyRequestTests.WAIT_AND_INCREMENT;
+import static org.apache.ratis.ReadOnlyRequestTests.assertLongAtLeast;
import static org.apache.ratis.ReadOnlyRequestTests.assertOption;
import static org.apache.ratis.ReadOnlyRequestTests.assertReplyAtLeast;
import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact;
+import static org.apache.ratis.ReadOnlyRequestTests.getCount;
+import static org.apache.ratis.ReadOnlyRequestTests.readOnlyAsync;
+import static org.apache.ratis.ReadOnlyRequestTests.readOnlyAsyncPreferNonLinearizable;
import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -150,6 +156,7 @@ static void runTestFollowerLinearizableRead(C cluste
final int n = 100;
final List f0Replies = new ArrayList<>(n);
final List f1Replies = new ArrayList<>(n);
+ final List> f0LocalReplies = new ArrayList<>(n);
try (RaftClient client = cluster.createClient(leaderId);
RaftClient c0 = cluster.createClient(f0);
RaftClient c1 = cluster.createClient(f1);
@@ -160,12 +167,24 @@ static void runTestFollowerLinearizableRead(C cluste
f0Replies.add(new Reply(count, c0.async().sendReadOnly(QUERY, f0)));
f1Replies.add(new Reply(count, c1.async().sendReadOnly(QUERY, f1)));
+ f0LocalReplies.add(readOnlyAsync(
+ followers.get(0), () -> CompletableFuture.completedFuture(getCount(followers.get(0)))));
}
for (int i = 0; i < n; i++) {
f0Replies.get(i).assertAtLeast();
f1Replies.get(i).assertAtLeast();
+ assertLongAtLeast(i + 1, f0LocalReplies.get(i).join());
}
+
+ final AtomicBoolean callbackInvoked = new AtomicBoolean();
+ final CompletableFuture preferNonLinearizable = readOnlyAsyncPreferNonLinearizable(
+ followers.get(0), () -> {
+ callbackInvoked.set(true);
+ return CompletableFuture.completedFuture(getCount(followers.get(0)));
+ });
+ Assertions.assertThrows(CompletionException.class, preferNonLinearizable::join);
+ Assertions.assertFalse(callbackInvoked.get());
}
}
@@ -285,4 +304,4 @@ static void runTestReadAfterWrite(C cluster) throws
assertReplyAtLeast(2, asyncReply.join());
}
}
-}
\ No newline at end of file
+}
diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index 94e9433b15..49c189b8f8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -20,7 +20,9 @@
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
@@ -40,9 +42,13 @@
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
public abstract class ReadOnlyRequestTests
extends BaseTest
@@ -81,15 +87,28 @@ public void testReadOnly() throws Exception {
static void runTestReadOnly(C cluster) throws Exception {
try {
- RaftTestUtil.waitForLeader(cluster);
- final RaftPeerId leaderId = cluster.getLeader().getId();
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = leader.getId();
try (final RaftClient client = cluster.createClient(leaderId)) {
for (int i = 1; i <= 10; i++) {
assertReplyExact(i, client.io().send(INCREMENT));
assertReplyExact(i, client.io().sendReadOnly(QUERY));
+ Assertions.assertEquals(i, readOnlyAsync(
+ leader, () -> CompletableFuture.completedFuture(getCount(leader))).get());
}
}
+
+ if (RaftServerConfigKeys.Read.option(cluster.getProperties()) == RaftServerConfigKeys.Read.Option.DEFAULT) {
+ final RaftServer.Division follower = cluster.getFollowers().get(0);
+ final AtomicBoolean callbackInvoked = new AtomicBoolean();
+ final CompletableFuture read = readOnlyAsync(follower, () -> {
+ callbackInvoked.set(true);
+ return CompletableFuture.completedFuture(getCount(follower));
+ });
+ Assertions.assertThrows(CompletionException.class, read::join);
+ Assertions.assertFalse(callbackInvoked.get());
+ }
} finally {
cluster.shutdown();
}
@@ -144,6 +163,20 @@ static int retrieve(RaftClientReply reply) {
return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
}
+ static long getCount(RaftServer.Division server) {
+ return ((CounterStateMachine) server.getStateMachine()).getCount();
+ }
+
+ static CompletableFuture readOnlyAsync(
+ RaftServer.Division server, Supplier> query) throws IOException {
+ return server.readOnlyAsync(ClientId.emptyClientId(), RaftClientRequest.readRequestType().getRead(), query);
+ }
+
+ static CompletableFuture readOnlyAsyncPreferNonLinearizable(
+ RaftServer.Division server, Supplier> query) throws IOException {
+ return server.readOnlyAsync(ClientId.emptyClientId(), RaftClientRequest.readRequestType(true).getRead(), query);
+ }
+
public static void assertReplyExact(int expectedCount, RaftClientReply reply) {
Assertions.assertTrue(reply.isSuccess());
final int retrieved = retrieve(reply);
@@ -157,6 +190,11 @@ static void assertReplyAtLeast(int minCount, RaftClientReply reply) {
() -> "retrieved = " + retrieved + " < minCount = " + minCount + ", reply=" + reply);
}
+ public static void assertLongAtLeast(int minCount, long retrieved) {
+ Assertions.assertTrue(retrieved >= minCount,
+ () -> "retrieved = " + retrieved + " < minCount = " + minCount);
+ }
+
/**
* CounterStateMachine support 3 operations
* 1. increment