From 041ce95d5e3e6e9466f226f0de04e48135a925dd Mon Sep 17 00:00:00 2001 From: shbatm Date: Fri, 22 May 2026 15:10:55 -0500 Subject: [PATCH 1/2] fix(ws): hold SYNCING through post-connect status replay (#512) WebSocketClient set ES_CONNECTED the instant the aiohttp socket opened, before the controller's post-connect status replay (a burst of ST/DON/DOF frames) drained. connection_events consumers that treat ES_CONNECTED as "the stream is live" therefore saw the replay as live events, firing spurious triggers on every connect, config-entry reload, and reconnect. Add an intermediate ES_SYNCING state held from socket-open until the replay goes quiet for WS_SYNC_QUIET_SECONDS (1.0s, no frame), then promote to ES_CONNECTED. A hard cap (WS_SYNC_MAX_SECONDS, 10.0s) keeps a chatty controller from stalling the stream. A sampled watcher task does the debounce so the read loop stays a plain `async for`; it is cancelled in the read-loop finally, so a socket that drops before settling never reports CONNECTED. Records still update during SYNCING (the dispatcher keeps feeding); only the "stream is live" signal is withheld. SYNCING is re-armed on every reconnect. Status transitions now log at DEBUG. Pure-additive: ES_SYNCING is a new enum value/alias; no existing ES_* constant is renamed or repurposed. HA Core's isy994 only branches on CONNECTED/DISCONNECTED/LOST_STREAM_CONNECTION/RECONNECTING/RECONNECT_FAILED and gates entity availability on node.enabled (not isy.connected), so it is unaffected until it opts into gating event-style entities on connected. Backport of pyisyox#170. Unblocks home-assistant/core#169782. Co-Authored-By: Claude Opus 4.7 --- pyisy/constants.py | 2 + pyisy/events/websocket.py | 85 +++++++++++-- tests/test_websocket_lifecycle.py | 190 ++++++++++++++++++++++++++++-- 3 files changed, 259 insertions(+), 18 deletions(-) diff --git a/pyisy/constants.py b/pyisy/constants.py index fc73bbaa..5e34b4e9 100644 --- a/pyisy/constants.py +++ b/pyisy/constants.py @@ -30,6 +30,7 @@ class EventStreamStatus(StrEnum): LOST_STREAM_CONNECTION = "lost_stream_connection" CONNECTED = "connected" + SYNCING = "stream_syncing" DISCONNECTED = "disconnected" START_UPDATES = "start_updates" STOP_UPDATES = "stop_updates" @@ -46,6 +47,7 @@ class EventStreamStatus(StrEnum): # their string values, so existing `== ES_CONNECTED` checks keep working. ES_LOST_STREAM_CONNECTION = EventStreamStatus.LOST_STREAM_CONNECTION ES_CONNECTED = EventStreamStatus.CONNECTED +ES_SYNCING = EventStreamStatus.SYNCING ES_DISCONNECTED = EventStreamStatus.DISCONNECTED ES_START_UPDATES = EventStreamStatus.START_UPDATES ES_STOP_UPDATES = EventStreamStatus.STOP_UPDATES diff --git a/pyisy/events/websocket.py b/pyisy/events/websocket.py index 4efc5dc2..70b77d34 100644 --- a/pyisy/events/websocket.py +++ b/pyisy/events/websocket.py @@ -27,6 +27,7 @@ ES_NOT_STARTED, ES_RECONNECTING, ES_STOP_UPDATES, + ES_SYNCING, PROP_STATUS, TAG_EVENT_INFO, TAG_NODE, @@ -51,6 +52,15 @@ WS_MAX_RETRIES = 4 WS_RETRY_BACKOFF: list[float] = [0.01, 1, 10, 30, 60] # Seconds +# After the socket opens the controller replays every node's current +# status as a burst of ST/DON/DOF frames. The stream is held in +# ES_SYNCING until that burst goes quiet for WS_SYNC_QUIET_SECONDS (no +# frame), then flips to ES_CONNECTED. WS_SYNC_MAX_SECONDS is a hard cap +# so a perpetually chatty controller can never stall the stream in +# ES_SYNCING. Module-level so tests can monkeypatch them small. +WS_SYNC_QUIET_SECONDS: float = 1.0 +WS_SYNC_MAX_SECONDS: float = 10.0 + class WebSocketClient: """Class for handling web socket communications with the ISY.""" @@ -88,6 +98,11 @@ def __init__( self._program_key = None self.websocket_task: asyncio.Task[None] = None self.guardian_task: asyncio.Task[None] = None + # Watcher that promotes ES_SYNCING -> ES_CONNECTED once the + # post-connect status replay drains; _frame_count is the sample + # it reads to detect quiet. See _promote_when_quiet (#512). + self._sync_task: asyncio.Task[None] | None = None + self._frame_count: int = 0 if websession is None: websession = get_new_client_session(use_https, tls_ver) @@ -156,6 +171,7 @@ def status(self) -> str: def status(self, value): """Set the current node state and notify listeners.""" if self._status != value: + _LOGGER.debug("Event stream status: %s -> %s", self._status, value) self._status = value self.isy.connection_events.notify(self._status) return self._status @@ -241,6 +257,43 @@ def update_received(self, xmldoc: minidom.Element) -> None: self._sid = attr_from_xml(xmldoc, "Event", ATTR_STREAM_ID) _LOGGER.debug("ISY Updated Events Stream ID: %s", self._sid) + def _cancel_sync_task(self) -> None: + """Cancel the SYNCING quiet-window watcher, if running.""" + if self._sync_task is not None: + self._sync_task.cancel() + self._sync_task = None + + async def _promote_when_quiet(self) -> None: + """Promote ES_SYNCING -> ES_CONNECTED once the post-connect status + replay goes quiet (or the hard cap elapses). + + On connect the controller replays every node's current status as a + burst of frames; holding ES_SYNCING until the burst settles keeps + ``connection_events`` consumers from treating the replay as live + events (spurious triggers on every connect/restart; #512). Records + still update during ES_SYNCING — ``_route_message`` keeps feeding — + only the "stream is live" signal waits. + + Sampled rather than event-driven so the read loop stays a plain + ``async for``: every WS_SYNC_QUIET_SECONDS we check whether any + frame arrived since the last sample. A window with no new frame is + treated as quiet (including the first window on a silent + controller). WS_SYNC_MAX_SECONDS caps the wait so a perpetually + chatty controller still goes live. Cancelled by ``websocket``'s + ``finally`` if the socket drops first, so a connection that never + settles never reports ES_CONNECTED. + """ + deadline = self._loop.time() + WS_SYNC_MAX_SECONDS + seen = self._frame_count + while self.status == ES_SYNCING: + await asyncio.sleep(WS_SYNC_QUIET_SECONDS) + quiet = self._frame_count == seen + seen = self._frame_count + if quiet or self._loop.time() >= deadline: + break + if self.status == ES_SYNCING: + self.status = ES_CONNECTED + async def websocket(self, retries: int = 0) -> None: """Start websocket connection.""" try: @@ -253,19 +306,29 @@ async def websocket(self, retries: int = 0) -> None: receive_timeout=self._hbwait + WS_HB_GRACE, ssl=self.sslcontext, ) as ws: - self.status = ES_CONNECTED retries = 0 _LOGGER.debug("Successfully connected to websocket.") - - async for msg in ws: - msg_type = msg.type - if msg_type is aiohttp.WSMsgType.TEXT: - await self._route_message(msg.data) - elif msg_type is aiohttp.WSMsgType.BINARY: - _LOGGER.warning("Unexpected binary message received.") - elif msg_type is aiohttp.WSMsgType.ERROR: - _LOGGER.error("Error during receive %s", ws.exception()) - break + # Socket is open, but the controller now replays every + # node's current status. Hold ES_SYNCING (not ES_CONNECTED) + # until that burst drains so consumers don't fire on the + # replay; the watcher promotes once it goes quiet (#512). + self._frame_count = 0 + self.status = ES_SYNCING + self._sync_task = self._loop.create_task(self._promote_when_quiet()) + + try: + async for msg in ws: + msg_type = msg.type + if msg_type is aiohttp.WSMsgType.TEXT: + self._frame_count += 1 + await self._route_message(msg.data) + elif msg_type is aiohttp.WSMsgType.BINARY: + _LOGGER.warning("Unexpected binary message received.") + elif msg_type is aiohttp.WSMsgType.ERROR: + _LOGGER.error("Error during receive %s", ws.exception()) + break + finally: + self._cancel_sync_task() except asyncio.CancelledError: self.status = ES_DISCONNECTED diff --git a/tests/test_websocket_lifecycle.py b/tests/test_websocket_lifecycle.py index 745c5f08..a5f24863 100644 --- a/tests/test_websocket_lifecycle.py +++ b/tests/test_websocket_lifecycle.py @@ -26,7 +26,9 @@ ES_NOT_STARTED, ES_RECONNECTING, ES_STOP_UPDATES, + ES_SYNCING, ) +from pyisy.events import websocket as ws_module from pyisy.events.websocket import WS_MAX_RETRIES, WebSocketClient # -- fixtures --------------------------------------------------------- @@ -206,19 +208,38 @@ class _FakeWS: Yields a scripted sequence of ``msg`` objects, then completes the ``async for``. ``exception()`` and ``close_code`` mimic the real - surface so the post-loop branch in ``websocket()`` can read them.""" + surface so the post-loop branch in ``websocket()`` can read them. - def __init__(self, messages, exc=None, close_code: int = 1000) -> None: + ``keep_open=True`` leaves the socket blocked (open) after the scripted + frames drain, until ``close()`` is awaited — this lets a test watch + the SYNCING -> CONNECTED quiet-window promotion without the read loop + tearing the socket down first. Each ``__anext__`` suspends once + (``await asyncio.sleep(0)``) so the sync watcher task gets to run.""" + + def __init__(self, messages, exc=None, close_code: int = 1000, keep_open: bool = False) -> None: self._messages = list(messages) self._exc = exc self.close_code = close_code + self._keep_open = keep_open + self.closed = False + self._closed_evt = asyncio.Event() def __aiter__(self): - async def _gen(): - for m in self._messages: - yield m - - return _gen() + return self + + async def __anext__(self): + if self.closed: + raise StopAsyncIteration + if self._messages: + await asyncio.sleep(0) + return self._messages.pop(0) + if self._keep_open: + await self._closed_evt.wait() + raise StopAsyncIteration + + async def close(self) -> None: + self.closed = True + self._closed_evt.set() def exception(self): return self._exc @@ -397,6 +418,161 @@ def _exception_side_effect(): reconnect.assert_not_called() +# -- SYNCING quiet-window gate (#512) --------------------------------- + + +async def test_promote_when_quiet_promotes_after_idle(ws_client: WebSocketClient, monkeypatch) -> None: + """A silent controller (replay already drained) promotes + SYNCING -> CONNECTED after a single quiet window.""" + monkeypatch.setattr(ws_module, "WS_SYNC_QUIET_SECONDS", 0.01) + monkeypatch.setattr(ws_module, "WS_SYNC_MAX_SECONDS", 1.0) + ws_client._status = ES_SYNCING + ws_client._frame_count = 3 # frames already received; none arriving now + await ws_client._promote_when_quiet() + assert ws_client.status == ES_CONNECTED + + +async def test_promote_when_quiet_holds_until_replay_drains(ws_client: WebSocketClient, monkeypatch) -> None: + """While the post-connect replay keeps delivering frames the watcher + holds SYNCING; once the burst goes quiet it promotes to CONNECTED. + This is the core guard against spurious triggers on every connect.""" + # QUIET (0.2s) is 20x the replay frame interval (0.01s), so a false + # "quiet" reading would require the replay task to be starved for a + # whole window (~20 missed wakeups) -- not a realistic CI stall. The + # observation window below spans two full quiet windows. + monkeypatch.setattr(ws_module, "WS_SYNC_QUIET_SECONDS", 0.2) + monkeypatch.setattr(ws_module, "WS_SYNC_MAX_SECONDS", 5.0) + ws_client._status = ES_SYNCING + ws_client._frame_count = 0 + + stop = asyncio.Event() + + async def _replay() -> None: + # Frames arrive far faster than the quiet window, so every sample + # the watcher takes sees movement -> never quiet. + while not stop.is_set(): + ws_client._frame_count += 1 + await asyncio.sleep(0.01) + + replay = asyncio.create_task(_replay()) + watcher = asyncio.create_task(ws_client._promote_when_quiet()) + try: + # Two quiet windows pass while the replay is still busy. + await asyncio.sleep(0.5) + assert not watcher.done() + assert ws_client.status == ES_SYNCING + + # Replay drains -> the next quiet sample promotes to CONNECTED. + stop.set() + await replay + await asyncio.wait_for(watcher, timeout=2.0) + assert ws_client.status == ES_CONNECTED + finally: + stop.set() + watcher.cancel() + + +async def test_promote_when_quiet_hard_cap_under_constant_traffic( + ws_client: WebSocketClient, monkeypatch +) -> None: + """A perpetually chatty controller (frames never stop) must still + promote at the WS_SYNC_MAX_SECONDS hard cap rather than stall in + SYNCING forever.""" + monkeypatch.setattr(ws_module, "WS_SYNC_QUIET_SECONDS", 0.05) + monkeypatch.setattr(ws_module, "WS_SYNC_MAX_SECONDS", 0.15) + ws_client._status = ES_SYNCING + ws_client._frame_count = 0 + + stop = asyncio.Event() + + async def _flood() -> None: + while not stop.is_set(): + ws_client._frame_count += 1 + await asyncio.sleep(0.005) + + flood = asyncio.create_task(_flood()) + try: + await asyncio.wait_for(ws_client._promote_when_quiet(), timeout=2.0) + assert ws_client.status == ES_CONNECTED + finally: + stop.set() + await asyncio.gather(flood, return_exceptions=True) + + +async def test_websocket_holds_syncing_through_replay_then_connects( + ws_client: WebSocketClient, monkeypatch +) -> None: + """End-to-end through ``websocket()``: a replayed frame is routed + (records still update) while the stream stays SYNCING, and CONNECTED + is only emitted after the quiet window.""" + # A wide quiet window (0.2s) keeps the "still SYNCING" assertion below + # well clear of the watcher's first post-frame sample -- the routed + # poll breaks within a few ms, leaving ~0.2s of slack before promotion + # becomes possible, so a scheduler hiccup can't race it. + monkeypatch.setattr(ws_module, "WS_SYNC_QUIET_SECONDS", 0.2) + monkeypatch.setattr(ws_module, "WS_SYNC_MAX_SECONDS", 5.0) + ws = _FakeWS(messages=[_ws_msg(aiohttp.WSMsgType.TEXT, "")], keep_open=True) + ws_client.req_session.ws_connect = _ws_connect_returning(ws) + routed: list[str] = [] + + async def _capture(self, msg): + routed.append(msg) + + with ( + patch.object(WebSocketClient, "_route_message", _capture), + patch.object(WebSocketClient, "_reconnect"), + ): + task = asyncio.create_task(ws_client.websocket()) + try: + # Wait for the replayed frame to be routed into state. + for _ in range(200): + if routed: + break + await asyncio.sleep(0.005) + await asyncio.sleep(0) # flush any same-tick callbacks + + # Replay routed (state synced) but the stream is not yet live. + assert routed == [""] + assert ws_client.status == ES_SYNCING + + # After the quiet window the stream goes live. + for _ in range(200): + if ws_client.status == ES_CONNECTED: + break + await asyncio.sleep(0.01) + assert ws_client.status == ES_CONNECTED + finally: + await ws.close() + await asyncio.wait_for(task, timeout=2.0) + + +async def test_websocket_socket_drop_before_quiet_never_connects( + ws_client: WebSocketClient, monkeypatch +) -> None: + """If the socket drops before the replay settles, the watcher is + cancelled in the ``finally`` and CONNECTED is never emitted — a + connection that never settles must not report itself as live.""" + monkeypatch.setattr(ws_module, "WS_SYNC_QUIET_SECONDS", 5.0) + monkeypatch.setattr(ws_module, "WS_SYNC_MAX_SECONDS", 10.0) + ws = _FakeWS(messages=[_ws_msg(aiohttp.WSMsgType.TEXT, "")]) # drains then closes + ws_client.req_session.ws_connect = _ws_connect_returning(ws) + notifications: list[str] = [] + ws_client.isy.connection_events.notify = notifications.append + + async def _capture(self, msg): + return None + + with ( + patch.object(WebSocketClient, "_route_message", _capture), + patch.object(WebSocketClient, "_reconnect"), + ): + await ws_client.websocket() + + assert ES_SYNCING in notifications + assert ES_CONNECTED not in notifications + assert ws_client._sync_task is None # cancelled in the finally + + # -- helpers ---------------------------------------------------------- From 18c0107a532ec43b638cfe6793edbc489fecc641 Mon Sep 17 00:00:00 2001 From: shbatm Date: Fri, 22 May 2026 15:15:42 -0500 Subject: [PATCH 2/2] docs(ws): trim duplicated SYNCING rationale, note first-frame assumption Collapse the replay/SYNCING explanation that was restated in the module constant comment and the websocket() inline comment down to the canonical _promote_when_quiet docstring, and document that the quiet window is anchored to socket-open (the known first-frame-latency limitation, matching pyisyox). No behaviour change. Co-Authored-By: Claude Opus 4.7 --- pyisy/events/websocket.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pyisy/events/websocket.py b/pyisy/events/websocket.py index 70b77d34..17877f76 100644 --- a/pyisy/events/websocket.py +++ b/pyisy/events/websocket.py @@ -52,12 +52,8 @@ WS_MAX_RETRIES = 4 WS_RETRY_BACKOFF: list[float] = [0.01, 1, 10, 30, 60] # Seconds -# After the socket opens the controller replays every node's current -# status as a burst of ST/DON/DOF frames. The stream is held in -# ES_SYNCING until that burst goes quiet for WS_SYNC_QUIET_SECONDS (no -# frame), then flips to ES_CONNECTED. WS_SYNC_MAX_SECONDS is a hard cap -# so a perpetually chatty controller can never stall the stream in -# ES_SYNCING. Module-level so tests can monkeypatch them small. +# Quiet-window / hard-cap timings for the post-connect SYNCING gate (see +# _promote_when_quiet). Module-level so tests can monkeypatch them small. WS_SYNC_QUIET_SECONDS: float = 1.0 WS_SYNC_MAX_SECONDS: float = 10.0 @@ -282,6 +278,12 @@ async def _promote_when_quiet(self) -> None: chatty controller still goes live. Cancelled by ``websocket``'s ``finally`` if the socket drops first, so a connection that never settles never reports ES_CONNECTED. + + The window is anchored to socket-open, not to the first replayed + frame: the gate assumes the replay begins within the first quiet + window (true in practice — IoX serves it from cache on the + subscribe round-trip). A pathologically delayed first frame is the + known limitation, accepted to keep the silent-controller path fast. """ deadline = self._loop.time() + WS_SYNC_MAX_SECONDS seen = self._frame_count @@ -308,10 +310,9 @@ async def websocket(self, retries: int = 0) -> None: ) as ws: retries = 0 _LOGGER.debug("Successfully connected to websocket.") - # Socket is open, but the controller now replays every - # node's current status. Hold ES_SYNCING (not ES_CONNECTED) - # until that burst drains so consumers don't fire on the - # replay; the watcher promotes once it goes quiet (#512). + # Hold ES_SYNCING through the controller's post-connect status + # replay so consumers don't fire on it; watcher promotes once + # quiet (#512). self._frame_count = 0 self.status = ES_SYNCING self._sync_task = self._loop.create_task(self._promote_when_quiet())