From 97411be509da526271d54727716032984c4b6fd0 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Tue, 5 May 2026 15:58:38 +0800 Subject: [PATCH] RATIS-2514. Fix flaky TestReadOnlyRequestWithGrpc.testReadAfterWrite. --- .../apache/ratis/ReadOnlyRequestTests.java | 35 +++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java index a46c996108..a4861c42a9 100644 --- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java @@ -42,6 +42,7 @@ import org.slf4j.event.Level; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -299,20 +300,40 @@ private void testReadAfterWriteImpl(CLUSTER cluster) throws Exception { Assertions.assertEquals(1, retrieve(blockReply)); // test asynchronous read-after-write - client.async().send(incrementMessage); - client.async().sendReadAfterWrite(queryMessage).thenAccept(reply -> { - Assertions.assertEquals(2, retrieve(reply)); - }); + client.async().send(incrementMessage).get(); + Assertions.assertEquals(2, retrieve(client.async().sendReadAfterWrite(queryMessage).get())); + // Keep the async write workload, but wait for the writes to complete before issuing reads. + // Otherwise, the following reads may observe different subsets of concurrent writes. + final List> writes = new ArrayList<>(); for (int i = 0; i < 20; i++) { - client.async().send(incrementMessage); + writes.add(client.async().send(incrementMessage)); } + CompletableFuture.allOf(writes.toArray(new CompletableFuture[0])).get(); + for (CompletableFuture write : writes) { + Assertions.assertTrue(write.get().isSuccess()); + } + final CompletableFuture linearizable = client.async().sendReadOnly(queryMessage); final CompletableFuture readAfterWrite = client.async().sendReadAfterWrite(queryMessage); CompletableFuture.allOf(linearizable, readAfterWrite).get(); - // read-after-write is more consistent than linearizable read - Assertions.assertTrue(retrieve(readAfterWrite.get()) >= retrieve(linearizable.get())); + // Both reads should observe the completed prior writes. Do not compare their freshness + // since these two reads are concurrent and may have different linearization points. + Assertions.assertEquals(22, retrieve(linearizable.get())); + Assertions.assertEquals(22, retrieve(readAfterWrite.get())); + + // Wait for followers to catch up before shutting down the cluster, so that outstanding + // AppendEntries references are released before leak detection runs. + final long leaderAppliedIndex = cluster.getLeader().getInfo().getLastAppliedIndex(); + RaftTestUtil.waitFor(() -> { + for (RaftServer.Division server : cluster.iterateDivisions()) { + if (server.getInfo().getLastAppliedIndex() < leaderAppliedIndex) { + return false; + } + } + return true; + }, 100, 10_000); } }