Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,15 @@ protected void setState() {

protected void enactState() {
if(change != null) {
// RTL3d1: implement the state change first, so channel states are updated
// before listeners observe the new connection state
states.get(stateIndication.state).enact(stateIndication, change);

if(change.current != change.previous) {
/* broadcast currentState change */
connection.onConnectionStateChange(change);
}

/* implement the state change */
states.get(stateIndication.state).enact(stateIndication, change);
if(currentState.terminal) {
clearTransport();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import io.ably.lib.realtime.ChannelEvent;
import io.ably.lib.realtime.ChannelState;
import io.ably.lib.realtime.ChannelStateListener;
import io.ably.lib.realtime.ConnectionEvent;
import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.realtime.ConnectionState;
import io.ably.lib.realtime.ConnectionStateListener;
import io.ably.lib.test.common.Helpers;
import io.ably.lib.test.common.Helpers.ChannelWaiter;
import io.ably.lib.test.common.Helpers.ConnectionWaiter;
Expand Down Expand Up @@ -37,6 +37,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -2128,13 +2129,10 @@ public void channel_attach_retry_failed() {
ably.connection.connectionManager.requestState(ConnectionState.suspended);
channelWaiter.waitFor(ChannelState.suspended);

/* Reconnect and immediately block transport's send(). This should fail channel reattach */
ably.connection.once(ConnectionState.connected, new ConnectionStateListener() {
@Override
public void onConnectionStateChanged(ConnectionStateChange state) {
mockTransport.blockSend();
}
});
/* Block transport's send() before reconnecting so that channel reattach fails.
* (Channel transitions to attaching before the connected event fires, so send
* must be blocked before requestState is called.) */
mockTransport.blockSend();
ably.connection.connectionManager.requestState(ConnectionState.connected);

/* Channel should move to attaching state */
Expand Down Expand Up @@ -2594,7 +2592,21 @@ public void release_should_not_prevent_graceful_test_end() throws Exception {
Thread.sleep(100);
}

assertFalse("Found orphan Timer threads", timers.stream().anyMatch(Thread::isAlive));
String activeTimers = timers.stream()
.filter(Thread::isAlive)
.map(this::stringStackTrace)
.collect(Collectors.joining("\n\n"));

assertFalse(
"Found orphan Timer threads:\n" + activeTimers,
timers.stream().anyMatch(Thread::isAlive)
);
}

private String stringStackTrace(Thread thread) {
return Arrays.stream(thread.getStackTrace())
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n"));
}

/**
Expand Down Expand Up @@ -2640,6 +2652,56 @@ public void channel_get_objects_throws_exception() throws AblyException {
}
}

/*
* @spec RTL3d1
*
* Given: an attached channel
* When: the connection drops and then reconnects
* Then: at the moment the connection fires onConnected, the channel state is already attaching
*/
@Test
public void channel_state_is_attaching_on_reconnected() throws AblyException {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
AblyRealtime ably = new AblyRealtime(opts);
ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);

/* attach channel and wait for it to be fully attached */
final Channel channel = ably.channels.get("channel_state_is_attaching_on_reconnected_" + testParams.name);
ChannelWaiter channelWaiter = new ChannelWaiter(channel);
channel.attach();
channelWaiter.waitFor(ChannelState.attached);
assertEquals("Channel should be attached", ChannelState.attached, channel.state);

/* capture channel state at the moment the connection becomes connected */
final ChannelState[] channelStateOnConnected = new ChannelState[1];

/* drop the connection */
new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries();
connectionWaiter.waitFor(ConnectionState.disconnected);
assertEquals("Connection should be disconnected", ConnectionState.disconnected, ably.connection.state);

Helpers.CompletionWaiter completionWaiter = new Helpers.CompletionWaiter();

ably.connection.on(ConnectionEvent.connected, stateChange -> {
synchronized (channelStateOnConnected) {
channelStateOnConnected[0] = channel.state;
completionWaiter.onSuccess();
}
});

/* reconnect */
ably.connection.connect();

completionWaiter.waitFor();

/* verify channel was attaching at the moment the connection became connected */
assertEquals(
"Channel state should be attaching when connection fires connected",
ChannelState.attaching,
channelStateOnConnected[0]
);
}

static class DetachingProtocolListener implements DebugOptions.RawProtocolListener {

public Channel theChannel;
Expand Down
Loading