From 3bec49d16d6001abfe1f0184b5354402506686d1 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 4 May 2026 22:01:26 +0800 Subject: [PATCH 1/2] RATIS-2509. Add typed local read-only API --- .../ratis/client/impl/ClientProtoUtils.java | 11 +-- .../org/apache/ratis/server/RaftServer.java | 19 +++++ .../ratis/server/impl/RaftServerImpl.java | 80 ++++++++++++++++--- .../ratis/server/impl/ServerProtoUtils.java | 11 ++- .../apache/ratis/LinearizableReadTests.java | 9 ++- .../apache/ratis/ReadOnlyRequestTests.java | 37 ++++++++- 6 files changed, 146 insertions(+), 21 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index d2146a521f..7bb90b74ad 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -121,6 +121,11 @@ static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftGroupMemberI return toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(), replyId, requestorId.getGroupId()); } + /** For client requests. */ + static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(ClientId clientId, RaftGroupMemberId serverId) { + return toRaftRpcRequestProtoBuilder(clientId.toByteString(), serverId.getPeerId(), serverId.getGroupId()); + } + /** For client requests. */ static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftClientRequest request) { final RaftRpcRequestProto.Builder b = toRaftRpcRequestProtoBuilder( @@ -208,13 +213,9 @@ static ByteBuffer toRaftClientRequestProtoByteBuffer(RaftClientRequest request) } static RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request) { - return toRaftClientRequestProto(request, true); - } - - static RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request, boolean withMsg) { final RaftClientRequestProto.Builder b = RaftClientRequestProto.newBuilder() .setRpcRequest(toRaftRpcRequestProtoBuilder(request)); - if (withMsg && request.getMessage() != null) { + if (request.getMessage() != null) { b.setMessage(toClientMessageEntryProtoBuilder(request.getMessage())); } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java index 84e3a1ed30..8efe3e5062 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java @@ -24,12 +24,16 @@ import java.util.Collection; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; +import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto; import org.apache.ratis.protocol.AdminAsynchronousProtocol; import org.apache.ratis.protocol.AdminProtocol; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; import org.apache.ratis.protocol.RaftClientProtocol; import org.apache.ratis.protocol.RaftGroup; @@ -102,6 +106,21 @@ default RaftGroup getGroup() { /** @return the {@link StateMachine} for this division. */ StateMachine getStateMachine(); + /** + * Execute a local read-only query after applying the configured + * {@link RaftServerConfigKeys.Read.Option} consistency checks. + * + *

This API is intended for embedded users that already have a local + * server division and want to avoid serializing application read requests + * and responses through {@link org.apache.ratis.protocol.Message}. Remote + * clients should continue to use + * {@link org.apache.ratis.client.api.AsyncApi#sendReadOnly(org.apache.ratis.protocol.Message)}. + */ + default CompletableFuture readOnlyAsync(ClientId clientId, ReadRequestTypeProto readRequestType, + Supplier> query) throws IOException { + throw new UnsupportedOperationException("readOnlyAsync is not supported"); + } + /** @return the raft log of this division. */ RaftLog getRaftLog(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c0e93338a6..260802cce5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -37,12 +37,14 @@ import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto; import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto; +import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto; import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto; import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.GroupInfoReply; import org.apache.ratis.protocol.GroupInfoRequest; @@ -1102,7 +1104,12 @@ private CompletableFuture sendReadIndexAsync(RaftClientRequ if (leaderId == null) { return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown.")); } - final ReadIndexRequestProto request = toReadIndexRequestProto(clientRequest, getMemberId(), leaderId); + return sendReadIndexAsync(clientRequest.getClientId(), clientRequest.getType().getRead(), leaderId); + } + + private CompletableFuture sendReadIndexAsync( + ClientId clientId, ReadRequestTypeProto readRequestType, RaftPeerId leaderId) { + final ReadIndexRequestProto request = toReadIndexRequestProto(clientId, readRequestType, getMemberId(), leaderId); try { return getServerRpc().async().readIndexAsync(request); } catch (IOException e) { @@ -1114,6 +1121,68 @@ private CompletableFuture getReadIndex(RaftClientRequest request, LeaderSt return writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex); } + private CompletableFuture getReadIndex(CompletableFuture readIndexReply) { + return readIndexReply.thenApply(reply -> { + if (reply.getServerReply().getSuccess()) { + return reply.getReadIndex(); + } else { + throw new CompletionException(new ReadIndexException(getId() + + ": Failed to get read index from the leader: " + reply)); + } + }); + } + + private CompletableFuture getReadIndexForReadOnly(ClientId clientId, ReadRequestTypeProto readRequestType) { + final LeaderStateImpl leader = role.getLeaderState().orElse(null); + if (leader != null) { + return leader.getReadIndex(null); + } + + final RaftPeerId leaderId = getInfo().getLeaderId(); + if (leaderId == null) { + return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown.")); + } + + return getReadIndex(sendReadIndexAsync(clientId, readRequestType, leaderId)); + } + + private CompletableFuture checkLeaderStateForReadOnly() { + if (!getInfo().isLeader()) { + return JavaUtils.completeExceptionally(generateNotLeaderException()); + } + if (!getInfo().isLeaderReady()) { + return JavaUtils.completeExceptionally(new LeaderNotReadyException(getMemberId())); + } + return null; + } + + private static CompletableFuture supplyReadOnly(Supplier> query) { + try { + return Objects.requireNonNull(query.get(), "query returned null"); + } catch (Throwable t) { + return JavaUtils.completeExceptionally(t); + } + } + + @Override + public CompletableFuture readOnlyAsync(ClientId clientId, ReadRequestTypeProto readRequestType, + Supplier> query) throws IOException { + Objects.requireNonNull(clientId, "clientId == null"); + Objects.requireNonNull(readRequestType, "readRequestType == null"); + Objects.requireNonNull(query, "query == null"); + assertLifeCycleState(LifeCycle.States.RUNNING); + if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT) { + final CompletableFuture reply = checkLeaderStateForReadOnly(); + return reply != null ? reply : supplyReadOnly(query); + } else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) { + return getReadIndexForReadOnly(clientId, readRequestType) + .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex)) + .thenCompose(readIndex -> supplyReadOnly(query)); + } else { + throw new IllegalStateException("Unexpected read option: " + readOption); + } + } + private CompletableFuture readAsync(RaftClientRequest request) { if (request.getType().getRead().getPreferNonLinearizable() || readOption == RaftServerConfigKeys.Read.Option.DEFAULT) { @@ -1135,14 +1204,7 @@ private CompletableFuture readAsync(RaftClientRequest request) if (leader != null) { replyFuture = getReadIndex(request, leader); } else { - replyFuture = sendReadIndexAsync(request).thenApply(reply -> { - if (reply.getServerReply().getSuccess()) { - return reply.getReadIndex(); - } else { - throw new CompletionException(new ReadIndexException(getId() + - ": Failed to get read index from the leader: " + reply)); - } - }); + replyFuture = getReadIndex(sendReadIndexAsync(request)); } return replyFuture diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 19d4ce6a75..f2fd142774 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -20,7 +20,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; -import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -115,10 +115,13 @@ static InstallSnapshotReplyProto toInstallSnapshotReplyProto( } static ReadIndexRequestProto toReadIndexRequestProto( - RaftClientRequest clientRequest, RaftGroupMemberId requestorId, RaftPeerId replyId) { + ClientId clientId, ReadRequestTypeProto readRequestType, RaftGroupMemberId serverId, RaftPeerId leaderId) { + final RaftClientRequestProto.Builder clientRequestBuilder = RaftClientRequestProto.newBuilder() + .setRpcRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(clientId, serverId)) + .setRead(readRequestType); return ReadIndexRequestProto.newBuilder() - .setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId)) - .setClientRequest(ClientProtoUtils.toRaftClientRequestProto(clientRequest, false)) + .setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(serverId, leaderId)) + .setClientRequest(clientRequestBuilder) .build(); } diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 09781b546e..7c8c0176f2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -47,9 +47,12 @@ import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT; import static org.apache.ratis.ReadOnlyRequestTests.QUERY; import static org.apache.ratis.ReadOnlyRequestTests.WAIT_AND_INCREMENT; +import static org.apache.ratis.ReadOnlyRequestTests.assertLongAtLeast; import static org.apache.ratis.ReadOnlyRequestTests.assertOption; import static org.apache.ratis.ReadOnlyRequestTests.assertReplyAtLeast; import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact; +import static org.apache.ratis.ReadOnlyRequestTests.getCount; +import static org.apache.ratis.ReadOnlyRequestTests.readOnlyAsync; import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; @@ -150,6 +153,7 @@ static void runTestFollowerLinearizableRead(C cluste final int n = 100; final List f0Replies = new ArrayList<>(n); final List f1Replies = new ArrayList<>(n); + final List> f0LocalReplies = new ArrayList<>(n); try (RaftClient client = cluster.createClient(leaderId); RaftClient c0 = cluster.createClient(f0); RaftClient c1 = cluster.createClient(f1); @@ -160,11 +164,14 @@ static void runTestFollowerLinearizableRead(C cluste f0Replies.add(new Reply(count, c0.async().sendReadOnly(QUERY, f0))); f1Replies.add(new Reply(count, c1.async().sendReadOnly(QUERY, f1))); + f0LocalReplies.add(readOnlyAsync( + followers.get(0), () -> CompletableFuture.completedFuture(getCount(followers.get(0))))); } for (int i = 0; i < n; i++) { f0Replies.get(i).assertAtLeast(); f1Replies.get(i).assertAtLeast(); + assertLongAtLeast(i + 1, f0LocalReplies.get(i).join()); } } } @@ -285,4 +292,4 @@ static void runTestReadAfterWrite(C cluster) throws assertReplyAtLeast(2, asyncReply.join()); } } -} \ No newline at end of file +} diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java index 94e9433b15..aa966644f0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java @@ -20,7 +20,9 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; @@ -40,9 +42,13 @@ import org.junit.jupiter.api.Test; import org.slf4j.event.Level; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; public abstract class ReadOnlyRequestTests extends BaseTest @@ -81,15 +87,28 @@ public void testReadOnly() throws Exception { static void runTestReadOnly(C cluster) throws Exception { try { - RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = leader.getId(); try (final RaftClient client = cluster.createClient(leaderId)) { for (int i = 1; i <= 10; i++) { assertReplyExact(i, client.io().send(INCREMENT)); assertReplyExact(i, client.io().sendReadOnly(QUERY)); + Assertions.assertEquals(i, readOnlyAsync( + leader, () -> CompletableFuture.completedFuture(getCount(leader))).get()); } } + + if (RaftServerConfigKeys.Read.option(cluster.getProperties()) == RaftServerConfigKeys.Read.Option.DEFAULT) { + final RaftServer.Division follower = cluster.getFollowers().get(0); + final AtomicBoolean callbackInvoked = new AtomicBoolean(); + final CompletableFuture read = readOnlyAsync(follower, () -> { + callbackInvoked.set(true); + return CompletableFuture.completedFuture(getCount(follower)); + }); + Assertions.assertThrows(CompletionException.class, read::join); + Assertions.assertFalse(callbackInvoked.get()); + } } finally { cluster.shutdown(); } @@ -144,6 +163,15 @@ static int retrieve(RaftClientReply reply) { return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8)); } + static long getCount(RaftServer.Division server) { + return ((CounterStateMachine) server.getStateMachine()).getCount(); + } + + static CompletableFuture readOnlyAsync( + RaftServer.Division server, Supplier> query) throws IOException { + return server.readOnlyAsync(ClientId.emptyClientId(), RaftClientRequest.readRequestType().getRead(), query); + } + public static void assertReplyExact(int expectedCount, RaftClientReply reply) { Assertions.assertTrue(reply.isSuccess()); final int retrieved = retrieve(reply); @@ -157,6 +185,11 @@ static void assertReplyAtLeast(int minCount, RaftClientReply reply) { () -> "retrieved = " + retrieved + " < minCount = " + minCount + ", reply=" + reply); } + public static void assertLongAtLeast(int minCount, long retrieved) { + Assertions.assertTrue(retrieved >= minCount, + () -> "retrieved = " + retrieved + " < minCount = " + minCount); + } + /** * CounterStateMachine support 3 operations * 1. increment From d5e482f92738c0e61fea4da658750a0426468167 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 6 May 2026 14:49:45 +0800 Subject: [PATCH 2/2] RATIS-2509. Honor preferNonLinearizable in typed reads --- .../org/apache/ratis/server/impl/RaftServerImpl.java | 2 +- .../java/org/apache/ratis/LinearizableReadTests.java | 12 ++++++++++++ .../java/org/apache/ratis/ReadOnlyRequestTests.java | 5 +++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 260802cce5..04836a2199 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1171,7 +1171,7 @@ public CompletableFuture readOnlyAsync(ClientId clientId, ReadRequestType Objects.requireNonNull(readRequestType, "readRequestType == null"); Objects.requireNonNull(query, "query == null"); assertLifeCycleState(LifeCycle.States.RUNNING); - if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT) { + if (readRequestType.getPreferNonLinearizable() || readOption == RaftServerConfigKeys.Read.Option.DEFAULT) { final CompletableFuture reply = checkLeaderStateForReadOnly(); return reply != null ? reply : supplyReadOnly(query); } else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) { diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 7c8c0176f2..47670f4a3e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -41,7 +41,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine; import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT; @@ -53,6 +55,7 @@ import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact; import static org.apache.ratis.ReadOnlyRequestTests.getCount; import static org.apache.ratis.ReadOnlyRequestTests.readOnlyAsync; +import static org.apache.ratis.ReadOnlyRequestTests.readOnlyAsyncPreferNonLinearizable; import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; @@ -173,6 +176,15 @@ static void runTestFollowerLinearizableRead(C cluste f1Replies.get(i).assertAtLeast(); assertLongAtLeast(i + 1, f0LocalReplies.get(i).join()); } + + final AtomicBoolean callbackInvoked = new AtomicBoolean(); + final CompletableFuture preferNonLinearizable = readOnlyAsyncPreferNonLinearizable( + followers.get(0), () -> { + callbackInvoked.set(true); + return CompletableFuture.completedFuture(getCount(followers.get(0))); + }); + Assertions.assertThrows(CompletionException.class, preferNonLinearizable::join); + Assertions.assertFalse(callbackInvoked.get()); } } diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java index aa966644f0..49c189b8f8 100644 --- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java @@ -172,6 +172,11 @@ static CompletableFuture readOnlyAsync( return server.readOnlyAsync(ClientId.emptyClientId(), RaftClientRequest.readRequestType().getRead(), query); } + static CompletableFuture readOnlyAsyncPreferNonLinearizable( + RaftServer.Division server, Supplier> query) throws IOException { + return server.readOnlyAsync(ClientId.emptyClientId(), RaftClientRequest.readRequestType(true).getRead(), query); + } + public static void assertReplyExact(int expectedCount, RaftClientReply reply) { Assertions.assertTrue(reply.isSuccess()); final int retrieved = retrieve(reply);