Skip to content

Commit fe81910

Browse files
authored
Merge pull request #667 from ably/AIT-316/annotations-support
[AIT-316] feat: introduce support for message annotations
2 parents b0499a9 + 0f90c68 commit fe81910

21 files changed

Lines changed: 2002 additions & 103 deletions

ably/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
from ably.rest.auth import Auth
55
from ably.rest.push import Push
66
from ably.rest.rest import AblyRest
7+
from ably.types.annotation import Annotation, AnnotationAction
78
from ably.types.capability import Capability
9+
from ably.types.channelmode import ChannelMode
10+
from ably.types.channeloptions import ChannelOptions
811
from ably.types.channelsubscription import PushChannelSubscription
912
from ably.types.device import DeviceDetails
1013
from ably.types.message import MessageAction, MessageVersion

ably/realtime/annotations.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from typing import TYPE_CHECKING
5+
6+
from ably.rest.annotations import RestAnnotations, construct_validate_annotation
7+
from ably.transport.websockettransport import ProtocolMessageAction
8+
from ably.types.annotation import Annotation, AnnotationAction
9+
from ably.types.channelmode import ChannelMode
10+
from ably.types.channelstate import ChannelState
11+
from ably.util.eventemitter import EventEmitter
12+
from ably.util.helper import is_callable_or_coroutine
13+
14+
if TYPE_CHECKING:
15+
from ably.realtime.channel import RealtimeChannel
16+
from ably.realtime.connectionmanager import ConnectionManager
17+
18+
log = logging.getLogger(__name__)
19+
20+
21+
class RealtimeAnnotations:
22+
"""
23+
Provides realtime methods for managing annotations on messages,
24+
including publishing annotations and subscribing to annotation events.
25+
"""
26+
27+
__connection_manager: ConnectionManager
28+
__channel: RealtimeChannel
29+
30+
def __init__(self, channel: RealtimeChannel, connection_manager: ConnectionManager):
31+
"""
32+
Initialize RealtimeAnnotations.
33+
34+
Args:
35+
channel: The Realtime Channel this annotations instance belongs to
36+
"""
37+
self.__channel = channel
38+
self.__connection_manager = connection_manager
39+
self.__subscriptions = EventEmitter()
40+
self.__rest_annotations = RestAnnotations(channel)
41+
42+
async def __send_annotation(self, annotation: Annotation, params: dict | None = None):
43+
"""
44+
Internal method to send an annotation via the realtime connection.
45+
46+
Args:
47+
annotation: Validated Annotation object with action and message_serial set
48+
params: Optional dict of query parameters
49+
"""
50+
# Check if channel and connection are in publishable state
51+
self.__channel._throw_if_unpublishable_state()
52+
53+
log.info(
54+
f'RealtimeAnnotations: sending annotation, channelName = {self.__channel.name}, '
55+
f'messageSerial = {annotation.message_serial}, '
56+
f'type = {annotation.type}, action = {annotation.action}'
57+
)
58+
59+
# Convert to wire format (array of annotations)
60+
wire_annotation = annotation.as_dict(binary=self.__channel.ably.options.use_binary_protocol)
61+
62+
# Build protocol message
63+
protocol_message = {
64+
"action": ProtocolMessageAction.ANNOTATION,
65+
"channel": self.__channel.name,
66+
"annotations": [wire_annotation],
67+
}
68+
69+
if params:
70+
# Stringify boolean params
71+
stringified_params = {k: str(v).lower() if isinstance(v, bool) else v for k, v in params.items()}
72+
protocol_message["params"] = stringified_params
73+
74+
# Send via WebSocket
75+
await self.__connection_manager.send_protocol_message(protocol_message)
76+
77+
async def publish(self, msg_or_serial, annotation: Annotation, params: dict | None = None):
78+
"""
79+
Publish an annotation on a message via the realtime connection.
80+
81+
Args:
82+
msg_or_serial: Either a message serial (string) or a Message object
83+
annotation: Annotation object
84+
params: Optional dict of query parameters
85+
86+
Returns:
87+
None
88+
89+
Raises:
90+
AblyException: If the request fails, inputs are invalid, or channel is in unpublishable state
91+
"""
92+
annotation = construct_validate_annotation(msg_or_serial, annotation)
93+
94+
# RSAN1c1/RTAN1a: Explicitly set action to ANNOTATION_CREATE
95+
annotation = annotation._copy_with(action=AnnotationAction.ANNOTATION_CREATE)
96+
97+
await self.__send_annotation(annotation, params)
98+
99+
async def delete(
100+
self,
101+
msg_or_serial,
102+
annotation: Annotation,
103+
params: dict | None = None,
104+
):
105+
"""
106+
Delete an annotation on a message.
107+
108+
Args:
109+
msg_or_serial: Either a message serial (string) or a Message object
110+
annotation: Annotation containing annotation properties
111+
params: Optional dict of query parameters
112+
113+
Returns:
114+
None
115+
116+
Raises:
117+
AblyException: If the request fails or inputs are invalid
118+
"""
119+
annotation = construct_validate_annotation(msg_or_serial, annotation)
120+
121+
# RSAN2a/RTAN2a: Explicitly set action to ANNOTATION_DELETE
122+
annotation = annotation._copy_with(action=AnnotationAction.ANNOTATION_DELETE)
123+
124+
await self.__send_annotation(annotation, params)
125+
126+
async def subscribe(self, *args):
127+
"""
128+
Subscribe to annotation events on this channel.
129+
130+
Parameters
131+
----------
132+
*args: type_or_types, listener
133+
Subscribe type(s) and listener
134+
135+
arg1(type_or_types): str or list[str], optional
136+
Subscribe to annotations of the given type or types (RTAN4c)
137+
138+
arg2(listener): callable
139+
Subscribe to all annotations on the channel
140+
141+
When no type is provided, arg1 is used as the listener.
142+
143+
Raises
144+
------
145+
ValueError
146+
If no valid subscribe arguments are passed
147+
"""
148+
# Parse arguments similar to channel.subscribe
149+
if len(args) == 0:
150+
raise ValueError("annotations.subscribe called without arguments")
151+
152+
annotation_types = None
153+
154+
# RTAN4c: Support string or list of strings as first argument
155+
if len(args) >= 2 and isinstance(args[0], (str, list)):
156+
if isinstance(args[0], list):
157+
annotation_types = args[0]
158+
else:
159+
annotation_types = [args[0]]
160+
if not args[1]:
161+
raise ValueError("annotations.subscribe called without listener")
162+
if not is_callable_or_coroutine(args[1]):
163+
raise ValueError("subscribe listener must be function or coroutine function")
164+
listener = args[1]
165+
elif is_callable_or_coroutine(args[0]):
166+
listener = args[0]
167+
else:
168+
raise ValueError('invalid subscribe arguments')
169+
170+
# RTAN4d: Implicitly attach channel on subscribe
171+
await self.__channel.attach()
172+
173+
# RTAN4e: Check if ANNOTATION_SUBSCRIBE mode is enabled (log warning per spec),
174+
# only when server explicitly sent modes (non-empty list)
175+
if self.__channel.state == ChannelState.ATTACHED and self.__channel.modes:
176+
if ChannelMode.ANNOTATION_SUBSCRIBE not in self.__channel.modes:
177+
log.warning(
178+
"You are trying to add an annotation listener, but the "
179+
"ANNOTATION_SUBSCRIBE channel mode was not included in the ATTACHED flags. "
180+
"This subscription may not receive annotations. Ensure you request the "
181+
"annotation_subscribe channel mode in ChannelOptions."
182+
)
183+
184+
# Register subscription after successful attach
185+
if annotation_types is not None:
186+
for t in annotation_types:
187+
self.__subscriptions.on(t, listener)
188+
else:
189+
self.__subscriptions.on(listener)
190+
191+
def unsubscribe(self, *args):
192+
"""
193+
Unsubscribe from annotation events on this channel.
194+
195+
Parameters
196+
----------
197+
*args: type_or_types, listener
198+
Unsubscribe type(s) and listener
199+
200+
arg1(type_or_types): str or list[str], optional
201+
Unsubscribe from annotations of the given type or types
202+
203+
arg2(listener): callable
204+
Unsubscribe from all annotations on the channel
205+
206+
When no type is provided, arg1 is used as the listener.
207+
When no arguments are provided, unsubscribes all annotation listeners (RTAN5).
208+
209+
Raises
210+
------
211+
ValueError
212+
If invalid unsubscribe arguments are passed
213+
"""
214+
# RTAN5: Support no arguments to unsubscribe all annotation listeners
215+
if len(args) == 0:
216+
self.__subscriptions.off()
217+
elif len(args) >= 2 and isinstance(args[0], (str, list)):
218+
# RTAN5a: Support string or list of strings for type(s)
219+
if isinstance(args[0], list):
220+
annotation_types = args[0]
221+
else:
222+
annotation_types = [args[0]]
223+
listener = args[1]
224+
for t in annotation_types:
225+
self.__subscriptions.off(t, listener)
226+
elif is_callable_or_coroutine(args[0]):
227+
listener = args[0]
228+
self.__subscriptions.off(listener)
229+
else:
230+
raise ValueError('invalid unsubscribe arguments')
231+
232+
def _process_incoming(self, incoming_annotations):
233+
"""
234+
Process incoming annotations from the server.
235+
236+
This is called internally when ANNOTATION protocol messages are received.
237+
238+
Args:
239+
incoming_annotations: List of Annotation objects received from the server
240+
"""
241+
for annotation in incoming_annotations:
242+
# Emit to type-specific listeners and catch-all listeners
243+
annotation_type = annotation.type or ''
244+
self.__subscriptions._emit(annotation_type, annotation)
245+
246+
async def get(self, msg_or_serial, params: dict | None = None):
247+
"""
248+
Retrieve annotations for a message with pagination support.
249+
250+
This delegates to the REST implementation.
251+
252+
Args:
253+
msg_or_serial: Either a message serial (string) or a Message object
254+
params: Optional dict of query parameters (limit, start, end, direction)
255+
256+
Returns:
257+
PaginatedResult: A paginated result containing Annotation objects
258+
259+
Raises:
260+
AblyException: If the request fails or serial is invalid
261+
"""
262+
# Delegate to REST implementation
263+
return await self.__rest_annotations.get(msg_or_serial, params)

0 commit comments

Comments
 (0)