diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 94ca2b822..6367b77bb 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -603,13 +603,15 @@ protected void setState() { protected void enactState() { if(change != null) { + // RTL3d1: implement the state change first, so channel states are updated + // before listeners observe the new connection state + states.get(stateIndication.state).enact(stateIndication, change); + if(change.current != change.previous) { /* broadcast currentState change */ connection.onConnectionStateChange(change); } - /* implement the state change */ - states.get(stateIndication.state).enact(stateIndication, change); if(currentState.terminal) { clearTransport(); } diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index 5350f0515..5ff53c6f4 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -7,9 +7,9 @@ import io.ably.lib.realtime.ChannelEvent; import io.ably.lib.realtime.ChannelState; import io.ably.lib.realtime.ChannelStateListener; +import io.ably.lib.realtime.ConnectionEvent; import io.ably.lib.realtime.CompletionListener; import io.ably.lib.realtime.ConnectionState; -import io.ably.lib.realtime.ConnectionStateListener; import io.ably.lib.test.common.Helpers; import io.ably.lib.test.common.Helpers.ChannelWaiter; import io.ably.lib.test.common.Helpers.ConnectionWaiter; @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -2128,13 +2129,10 @@ public void channel_attach_retry_failed() { ably.connection.connectionManager.requestState(ConnectionState.suspended); channelWaiter.waitFor(ChannelState.suspended); - /* Reconnect and immediately block transport's send(). This should fail channel reattach */ - ably.connection.once(ConnectionState.connected, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateChange state) { - mockTransport.blockSend(); - } - }); + /* Block transport's send() before reconnecting so that channel reattach fails. + * (Channel transitions to attaching before the connected event fires, so send + * must be blocked before requestState is called.) */ + mockTransport.blockSend(); ably.connection.connectionManager.requestState(ConnectionState.connected); /* Channel should move to attaching state */ @@ -2594,7 +2592,21 @@ public void release_should_not_prevent_graceful_test_end() throws Exception { Thread.sleep(100); } - assertFalse("Found orphan Timer threads", timers.stream().anyMatch(Thread::isAlive)); + String activeTimers = timers.stream() + .filter(Thread::isAlive) + .map(this::stringStackTrace) + .collect(Collectors.joining("\n\n")); + + assertFalse( + "Found orphan Timer threads:\n" + activeTimers, + timers.stream().anyMatch(Thread::isAlive) + ); + } + + private String stringStackTrace(Thread thread) { + return Arrays.stream(thread.getStackTrace()) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n")); } /** @@ -2640,6 +2652,56 @@ public void channel_get_objects_throws_exception() throws AblyException { } } + /* + * @spec RTL3d1 + * + * Given: an attached channel + * When: the connection drops and then reconnects + * Then: at the moment the connection fires onConnected, the channel state is already attaching + */ + @Test + public void channel_state_is_attaching_on_reconnected() throws AblyException { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + AblyRealtime ably = new AblyRealtime(opts); + ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); + + /* attach channel and wait for it to be fully attached */ + final Channel channel = ably.channels.get("channel_state_is_attaching_on_reconnected_" + testParams.name); + ChannelWaiter channelWaiter = new ChannelWaiter(channel); + channel.attach(); + channelWaiter.waitFor(ChannelState.attached); + assertEquals("Channel should be attached", ChannelState.attached, channel.state); + + /* capture channel state at the moment the connection becomes connected */ + final ChannelState[] channelStateOnConnected = new ChannelState[1]; + + /* drop the connection */ + new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries(); + connectionWaiter.waitFor(ConnectionState.disconnected); + assertEquals("Connection should be disconnected", ConnectionState.disconnected, ably.connection.state); + + Helpers.CompletionWaiter completionWaiter = new Helpers.CompletionWaiter(); + + ably.connection.on(ConnectionEvent.connected, stateChange -> { + synchronized (channelStateOnConnected) { + channelStateOnConnected[0] = channel.state; + completionWaiter.onSuccess(); + } + }); + + /* reconnect */ + ably.connection.connect(); + + completionWaiter.waitFor(); + + /* verify channel was attaching at the moment the connection became connected */ + assertEquals( + "Channel state should be attaching when connection fires connected", + ChannelState.attaching, + channelStateOnConnected[0] + ); + } + static class DetachingProtocolListener implements DebugOptions.RawProtocolListener { public Channel theChannel;