Skip to content

Commit 3375a5b

Browse files
authored
Merge pull request #1158 from ably/feature/liveobjects-use-server-gc-period
[ECO-5056] use server-provided GC grace period for garbage collection
2 parents da4c60f + 06a2157 commit 3375a5b

15 files changed

Lines changed: 195 additions & 73 deletions

File tree

lib/src/main/java/io/ably/lib/objects/Adapter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import io.ably.lib.realtime.AblyRealtime;
44
import io.ably.lib.realtime.ChannelBase;
5-
import io.ably.lib.transport.ConnectionManager;
5+
import io.ably.lib.realtime.Connection;
66
import io.ably.lib.types.AblyException;
77
import io.ably.lib.types.ClientOptions;
88
import io.ably.lib.types.ErrorInfo;
@@ -23,8 +23,8 @@ public Adapter(@NotNull AblyRealtime ably) {
2323
}
2424

2525
@Override
26-
public @NotNull ConnectionManager getConnectionManager() {
27-
return ably.connection.connectionManager;
26+
public @NotNull Connection getConnection() {
27+
return ably.connection;
2828
}
2929

3030
@Override

lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.ably.lib.objects;
22

33
import io.ably.lib.realtime.ChannelBase;
4-
import io.ably.lib.transport.ConnectionManager;
4+
import io.ably.lib.realtime.Connection;
55
import io.ably.lib.types.AblyException;
66
import io.ably.lib.types.ClientOptions;
77
import org.jetbrains.annotations.Blocking;
@@ -18,13 +18,13 @@ public interface ObjectsAdapter {
1818
@NotNull ClientOptions getClientOptions();
1919

2020
/**
21-
* Retrieves the connection manager for handling connection state and operations.
21+
* Retrieves the connection instance for handling connection state and operations.
2222
* Used to check connection status, obtain error information, and manage
2323
* message transmission across the Ably connection.
2424
*
25-
* @return the connection manager instance
25+
* @return the connection instance
2626
*/
27-
@NotNull ConnectionManager getConnectionManager();
27+
@NotNull Connection getConnection();
2828

2929
/**
3030
* Retrieves the current time in milliseconds from the Ably server.

lib/src/main/java/io/ably/lib/transport/ConnectionManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public class ConnectionManager implements ConnectListener {
104104
* This field is initialized only if the LiveObjects plugin is present in the classpath.
105105
*/
106106
private final LiveObjectsPlugin liveObjectsPlugin;
107+
public Long objectsGCGracePeriod = null;
107108

108109
/**
109110
* Methods on the channels map owned by the {@link AblyRealtime} instance
@@ -1319,6 +1320,7 @@ private synchronized void onConnected(ProtocolMessage message) {
13191320
connectionStateTtl = connectionDetails.connectionStateTtl;
13201321
maxMessageSize = connectionDetails.maxMessageSize;
13211322
siteCode = connectionDetails.siteCode; // CD2j
1323+
objectsGCGracePeriod = connectionDetails.objectsGCGracePeriod;
13221324

13231325
/* set the clientId resolved from token, if any */
13241326
String clientId = connectionDetails.clientId;

lib/src/main/java/io/ably/lib/types/ConnectionDetails.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public class ConnectionDetails {
8181
*/
8282
public String siteCode;
8383

84+
/**
85+
* The duration in milliseconds used to retain tombstoned objects at client side.
86+
*/
87+
public Long objectsGCGracePeriod;
88+
8489
ConnectionDetails() {
8590
maxIdleInterval = Defaults.maxIdleInterval;
8691
connectionStateTtl = Defaults.connectionStateTtl;
@@ -124,6 +129,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException {
124129
case "siteCode":
125130
siteCode = unpacker.unpackString();
126131
break;
132+
case "objectsGCGracePeriod":
133+
objectsGCGracePeriod = unpacker.unpackLong();
134+
break;
127135
default:
128136
Log.v(TAG, "Unexpected field: " + fieldName);
129137
unpacker.skipValue();

lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2617,6 +2617,29 @@ public void connect_should_not_rewrite_immediate_attach() throws AblyException {
26172617
}
26182618
}
26192619

2620+
@Test
2621+
public void channel_get_objects_throws_exception() throws AblyException {
2622+
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
2623+
try (AblyRealtime ably = new AblyRealtime(opts)) {
2624+
2625+
/* wait until connected */
2626+
new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);
2627+
assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected);
2628+
2629+
/* create a channel and attach */
2630+
final Channel channel = ably.channels.get("channel");
2631+
channel.attach();
2632+
new ChannelWaiter(channel).waitFor(ChannelState.attached);
2633+
assertEquals("Verify attached state reached", channel.state, ChannelState.attached);
2634+
2635+
AblyException exception = assertThrows(AblyException.class, channel::getObjects);
2636+
assertNotNull(exception);
2637+
assertEquals(40019, exception.errorInfo.code);
2638+
assertEquals(400, exception.errorInfo.statusCode);
2639+
assertTrue(exception.errorInfo.message.contains("LiveObjects plugin hasn't been installed"));
2640+
}
2641+
}
2642+
26202643
static class DetachingProtocolListener implements DebugOptions.RawProtocolListener {
26212644

26222645
public Channel theChannel;

liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package io.ably.lib.objects
33
import io.ably.lib.realtime.ChannelState
44
import io.ably.lib.realtime.CompletionListener
55
import io.ably.lib.types.Callback
6+
import io.ably.lib.realtime.ConnectionEvent
7+
import io.ably.lib.realtime.ConnectionStateListener
68
import io.ably.lib.types.ChannelMode
79
import io.ably.lib.types.ErrorInfo
810
import io.ably.lib.types.ProtocolMessage
@@ -12,6 +14,8 @@ import kotlinx.coroutines.suspendCancellableCoroutine
1214
import kotlin.coroutines.resume
1315
import kotlin.coroutines.resumeWithException
1416

17+
internal val ObjectsAdapter.connectionManager get() = connection.connectionManager
18+
1519
/**
1620
* Spec: RTO15g
1721
*/
@@ -47,6 +51,16 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa
4751
}
4852
}
4953

54+
internal fun ObjectsAdapter.onGCGracePeriodUpdated(block : (Long?) -> Unit) : ObjectsSubscription {
55+
connectionManager.objectsGCGracePeriod?.let { block(it) }
56+
// Return new objectsGCGracePeriod whenever connection state changes to connected
57+
val listener: (_: ConnectionStateListener.ConnectionStateChange) -> Unit = {
58+
block(connectionManager.objectsGCGracePeriod)
59+
}
60+
connection.on(ConnectionEvent.connected, listener)
61+
return ObjectsSubscription { connection.off(listener) }
62+
}
63+
5064
/**
5165
* Retrieves the channel modes for a specific channel.
5266
* This method returns the modes that are set for the specified channel.

liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import java.util.concurrent.ConcurrentHashMap
1414
internal object ObjectsPoolDefaults {
1515
const val GC_INTERVAL_MS = 1000L * 60 * 5 // 5 minutes
1616
/**
17+
* The SDK will attempt to use the `objectsGCGracePeriod` value provided by the server in the `connectionDetails`
18+
* object of the `CONNECTED` event.
19+
* If the server does not provide this value, the SDK will fall back to this default value.
1720
* Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation
1821
* with an earlier serial that would not have been applied if the tombstone still existed.
1922
*
@@ -49,10 +52,19 @@ internal class ObjectsPool(
4952
private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
5053
private var gcJob: Job // Job for the garbage collection coroutine
5154

55+
@Volatile private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS
56+
private var gcPeriodSubscription: ObjectsSubscription
57+
5258
init {
5359
// RTO3b - Initialize pool with root object
5460
pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects)
55-
// Start garbage collection coroutine
61+
// Start garbage collection coroutine with server-provided grace period if available
62+
gcPeriodSubscription = realtimeObjects.adapter.onGCGracePeriodUpdated { period ->
63+
period?.let {
64+
gcGracePeriod = it
65+
Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms")
66+
} ?: Log.i(tag, "Server did not provide objectsGCGracePeriod, using default: $gcGracePeriod ms")
67+
}
5668
gcJob = startGCJob()
5769
}
5870

@@ -123,9 +135,9 @@ internal class ObjectsPool(
123135
*/
124136
private fun onGCInterval() {
125137
pool.entries.removeIf { (_, obj) ->
126-
if (obj.isEligibleForGc()) { true } // Remove from pool
138+
if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool
127139
else {
128-
obj.onGCInterval()
140+
obj.onGCInterval(gcGracePeriod)
129141
false // Keep in pool
130142
}
131143
}
@@ -152,6 +164,7 @@ internal class ObjectsPool(
152164
* Should be called when the pool is no longer needed.
153165
*/
154166
fun dispose() {
167+
gcPeriodSubscription.unsubscribe()
155168
gcJob.cancel()
156169
gcScope.cancel()
157170
pool.clear()

liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import io.ably.lib.objects.ObjectMessage
44
import io.ably.lib.objects.ObjectOperation
55
import io.ably.lib.objects.ObjectState
66
import io.ably.lib.objects.ObjectsOperationSource
7-
import io.ably.lib.objects.ObjectsPoolDefaults
87
import io.ably.lib.objects.objectError
98
import io.ably.lib.objects.type.livecounter.noOpCounterUpdate
109
import io.ably.lib.objects.type.livemap.noOpMapUpdate
@@ -138,10 +137,20 @@ internal abstract class BaseRealtimeObject(
138137

139138
/**
140139
* Checks if the object is eligible for garbage collection.
140+
*
141+
* An object is eligible for garbage collection if it has been tombstoned and
142+
* the time since tombstoning exceeds the specified grace period.
143+
*
144+
* @param gcGracePeriod The grace period in milliseconds that tombstoned objects
145+
* should be kept before being eligible for collection.
146+
* This value is retrieved from the server's connection details
147+
* or defaults to 24 hours if not provided by the server.
148+
* @return true if the object is tombstoned and the grace period has elapsed,
149+
* false otherwise
141150
*/
142-
internal fun isEligibleForGc(): Boolean {
151+
internal fun isEligibleForGc(gcGracePeriod: Long): Boolean {
143152
val currentTime = System.currentTimeMillis()
144-
return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true
153+
return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true
145154
}
146155

147156
/**
@@ -198,12 +207,22 @@ internal abstract class BaseRealtimeObject(
198207
/**
199208
* Called during garbage collection intervals to clean up expired entries.
200209
*
210+
* This method is invoked periodically (every 5 minutes) by the ObjectsPool
211+
* to perform cleanup of tombstoned data that has exceeded the grace period.
212+
*
201213
* This method should identify and remove entries that:
202214
* - Have been marked as tombstoned
203-
* - Have a tombstone timestamp older than the configured grace period
215+
* - Have a tombstone timestamp older than the specified grace period
216+
*
217+
* @param gcGracePeriod The grace period in milliseconds that tombstoned entries
218+
* should be kept before being eligible for removal.
219+
* This value is retrieved from the server's connection details
220+
* or defaults to 24 hours if not provided by the server.
221+
* Must be greater than 2 minutes to ensure proper operation
222+
* ordering and avoid issues with delayed operations.
204223
*
205224
* Implementations typically use single-pass removal techniques to
206225
* efficiently clean up expired data without creating temporary collections.
207226
*/
208-
abstract fun onGCInterval()
227+
abstract fun onGCInterval(gcGracePeriod: Long)
209228
}

liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ internal class DefaultLiveCounter private constructor(
111111
liveCounterManager.notify(update as LiveCounterUpdate)
112112
}
113113

114-
override fun onGCInterval() {
114+
override fun onGCInterval(gcGracePeriod: Long) {
115115
// Nothing to GC for a counter object
116116
return
117117
}

liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ internal class DefaultLiveMap private constructor(
190190
liveMapManager.notify(update as LiveMapUpdate)
191191
}
192192

193-
override fun onGCInterval() {
194-
data.entries.removeIf { (_, entry) -> entry.isEligibleForGc() }
193+
override fun onGCInterval(gcGracePeriod: Long) {
194+
data.entries.removeIf { (_, entry) -> entry.isEligibleForGc(gcGracePeriod) }
195195
}
196196

197197
companion object {

0 commit comments

Comments
 (0)