Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyisy/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class EventStreamStatus(StrEnum):

LOST_STREAM_CONNECTION = "lost_stream_connection"
CONNECTED = "connected"
SYNCING = "stream_syncing"
DISCONNECTED = "disconnected"
START_UPDATES = "start_updates"
STOP_UPDATES = "stop_updates"
Expand All @@ -46,6 +47,7 @@ class EventStreamStatus(StrEnum):
# their string values, so existing `== ES_CONNECTED` checks keep working.
ES_LOST_STREAM_CONNECTION = EventStreamStatus.LOST_STREAM_CONNECTION
ES_CONNECTED = EventStreamStatus.CONNECTED
ES_SYNCING = EventStreamStatus.SYNCING
ES_DISCONNECTED = EventStreamStatus.DISCONNECTED
ES_START_UPDATES = EventStreamStatus.START_UPDATES
ES_STOP_UPDATES = EventStreamStatus.STOP_UPDATES
Expand Down
86 changes: 75 additions & 11 deletions pyisy/events/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ES_NOT_STARTED,
ES_RECONNECTING,
ES_STOP_UPDATES,
ES_SYNCING,
PROP_STATUS,
TAG_EVENT_INFO,
TAG_NODE,
Expand All @@ -51,6 +52,11 @@
WS_MAX_RETRIES = 4
WS_RETRY_BACKOFF: list[float] = [0.01, 1, 10, 30, 60] # Seconds

# Quiet-window / hard-cap timings for the post-connect SYNCING gate (see
# _promote_when_quiet). Module-level so tests can monkeypatch them small.
WS_SYNC_QUIET_SECONDS: float = 1.0
WS_SYNC_MAX_SECONDS: float = 10.0


class WebSocketClient:
"""Class for handling web socket communications with the ISY."""
Expand Down Expand Up @@ -88,6 +94,11 @@ def __init__(
self._program_key = None
self.websocket_task: asyncio.Task[None] = None
self.guardian_task: asyncio.Task[None] = None
# Watcher that promotes ES_SYNCING -> ES_CONNECTED once the
# post-connect status replay drains; _frame_count is the sample
# it reads to detect quiet. See _promote_when_quiet (#512).
self._sync_task: asyncio.Task[None] | None = None
self._frame_count: int = 0

if websession is None:
websession = get_new_client_session(use_https, tls_ver)
Expand Down Expand Up @@ -156,6 +167,7 @@ def status(self) -> str:
def status(self, value):
"""Set the current node state and notify listeners."""
if self._status != value:
_LOGGER.debug("Event stream status: %s -> %s", self._status, value)
self._status = value
self.isy.connection_events.notify(self._status)
return self._status
Expand Down Expand Up @@ -241,6 +253,49 @@ def update_received(self, xmldoc: minidom.Element) -> None:
self._sid = attr_from_xml(xmldoc, "Event", ATTR_STREAM_ID)
_LOGGER.debug("ISY Updated Events Stream ID: %s", self._sid)

def _cancel_sync_task(self) -> None:
"""Cancel the SYNCING quiet-window watcher, if running."""
if self._sync_task is not None:
self._sync_task.cancel()
self._sync_task = None

async def _promote_when_quiet(self) -> None:
"""Promote ES_SYNCING -> ES_CONNECTED once the post-connect status
replay goes quiet (or the hard cap elapses).

On connect the controller replays every node's current status as a
burst of frames; holding ES_SYNCING until the burst settles keeps
``connection_events`` consumers from treating the replay as live
events (spurious triggers on every connect/restart; #512). Records
still update during ES_SYNCING — ``_route_message`` keeps feeding —
only the "stream is live" signal waits.

Sampled rather than event-driven so the read loop stays a plain
``async for``: every WS_SYNC_QUIET_SECONDS we check whether any
frame arrived since the last sample. A window with no new frame is
treated as quiet (including the first window on a silent
controller). WS_SYNC_MAX_SECONDS caps the wait so a perpetually
chatty controller still goes live. Cancelled by ``websocket``'s
``finally`` if the socket drops first, so a connection that never
settles never reports ES_CONNECTED.

The window is anchored to socket-open, not to the first replayed
frame: the gate assumes the replay begins within the first quiet
window (true in practice — IoX serves it from cache on the
subscribe round-trip). A pathologically delayed first frame is the
known limitation, accepted to keep the silent-controller path fast.
"""
deadline = self._loop.time() + WS_SYNC_MAX_SECONDS
seen = self._frame_count
while self.status == ES_SYNCING:
await asyncio.sleep(WS_SYNC_QUIET_SECONDS)
quiet = self._frame_count == seen
seen = self._frame_count
if quiet or self._loop.time() >= deadline:
break
if self.status == ES_SYNCING:
self.status = ES_CONNECTED

async def websocket(self, retries: int = 0) -> None:
"""Start websocket connection."""
try:
Expand All @@ -253,19 +308,28 @@ async def websocket(self, retries: int = 0) -> None:
receive_timeout=self._hbwait + WS_HB_GRACE,
ssl=self.sslcontext,
) as ws:
self.status = ES_CONNECTED
retries = 0
_LOGGER.debug("Successfully connected to websocket.")

async for msg in ws:
msg_type = msg.type
if msg_type is aiohttp.WSMsgType.TEXT:
await self._route_message(msg.data)
elif msg_type is aiohttp.WSMsgType.BINARY:
_LOGGER.warning("Unexpected binary message received.")
elif msg_type is aiohttp.WSMsgType.ERROR:
_LOGGER.error("Error during receive %s", ws.exception())
break
# Hold ES_SYNCING through the controller's post-connect status
# replay so consumers don't fire on it; watcher promotes once
# quiet (#512).
self._frame_count = 0
self.status = ES_SYNCING
self._sync_task = self._loop.create_task(self._promote_when_quiet())

try:
async for msg in ws:
msg_type = msg.type
if msg_type is aiohttp.WSMsgType.TEXT:
self._frame_count += 1
await self._route_message(msg.data)
elif msg_type is aiohttp.WSMsgType.BINARY:
_LOGGER.warning("Unexpected binary message received.")
elif msg_type is aiohttp.WSMsgType.ERROR:
_LOGGER.error("Error during receive %s", ws.exception())
break
finally:
self._cancel_sync_task()

except asyncio.CancelledError:
self.status = ES_DISCONNECTED
Expand Down
190 changes: 183 additions & 7 deletions tests/test_websocket_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
ES_NOT_STARTED,
ES_RECONNECTING,
ES_STOP_UPDATES,
ES_SYNCING,
)
from pyisy.events import websocket as ws_module
from pyisy.events.websocket import WS_MAX_RETRIES, WebSocketClient

# -- fixtures ---------------------------------------------------------
Expand Down Expand Up @@ -206,19 +208,38 @@ class _FakeWS:

Yields a scripted sequence of ``msg`` objects, then completes the
``async for``. ``exception()`` and ``close_code`` mimic the real
surface so the post-loop branch in ``websocket()`` can read them."""
surface so the post-loop branch in ``websocket()`` can read them.

def __init__(self, messages, exc=None, close_code: int = 1000) -> None:
``keep_open=True`` leaves the socket blocked (open) after the scripted
frames drain, until ``close()`` is awaited — this lets a test watch
the SYNCING -> CONNECTED quiet-window promotion without the read loop
tearing the socket down first. Each ``__anext__`` suspends once
(``await asyncio.sleep(0)``) so the sync watcher task gets to run."""

def __init__(self, messages, exc=None, close_code: int = 1000, keep_open: bool = False) -> None:
self._messages = list(messages)
self._exc = exc
self.close_code = close_code
self._keep_open = keep_open
self.closed = False
self._closed_evt = asyncio.Event()

def __aiter__(self):
async def _gen():
for m in self._messages:
yield m

return _gen()
return self

async def __anext__(self):
if self.closed:
raise StopAsyncIteration
if self._messages:
await asyncio.sleep(0)
return self._messages.pop(0)
if self._keep_open:
await self._closed_evt.wait()
raise StopAsyncIteration

async def close(self) -> None:
self.closed = True
self._closed_evt.set()

def exception(self):
return self._exc
Expand Down Expand Up @@ -397,6 +418,161 @@ def _exception_side_effect():
reconnect.assert_not_called()


# -- SYNCING quiet-window gate (#512) ---------------------------------


async def test_promote_when_quiet_promotes_after_idle(ws_client: WebSocketClient, monkeypatch) -> None:
"""A silent controller (replay already drained) promotes
SYNCING -> CONNECTED after a single quiet window."""
monkeypatch.setattr(ws_module, "WS_SYNC_QUIET_SECONDS", 0.01)
monkeypatch.setattr(ws_module, "WS_SYNC_MAX_SECONDS", 1.0)
ws_client._status = ES_SYNCING
ws_client._frame_count = 3 # frames already received; none arriving now
await ws_client._promote_when_quiet()
assert ws_client.status == ES_CONNECTED


async def test_promote_when_quiet_holds_until_replay_drains(ws_client: WebSocketClient, monkeypatch) -> None:
"""While the post-connect replay keeps delivering frames the watcher
holds SYNCING; once the burst goes quiet it promotes to CONNECTED.
This is the core guard against spurious triggers on every connect."""
# QUIET (0.2s) is 20x the replay frame interval (0.01s), so a false
# "quiet" reading would require the replay task to be starved for a
# whole window (~20 missed wakeups) -- not a realistic CI stall. The
# observation window below spans two full quiet windows.
monkeypatch.setattr(ws_module, "WS_SYNC_QUIET_SECONDS", 0.2)
monkeypatch.setattr(ws_module, "WS_SYNC_MAX_SECONDS", 5.0)
ws_client._status = ES_SYNCING
ws_client._frame_count = 0

stop = asyncio.Event()

async def _replay() -> None:
# Frames arrive far faster than the quiet window, so every sample
# the watcher takes sees movement -> never quiet.
while not stop.is_set():
ws_client._frame_count += 1
await asyncio.sleep(0.01)

replay = asyncio.create_task(_replay())
watcher = asyncio.create_task(ws_client._promote_when_quiet())
try:
# Two quiet windows pass while the replay is still busy.
await asyncio.sleep(0.5)
assert not watcher.done()
assert ws_client.status == ES_SYNCING

# Replay drains -> the next quiet sample promotes to CONNECTED.
stop.set()
await replay
await asyncio.wait_for(watcher, timeout=2.0)
assert ws_client.status == ES_CONNECTED
finally:
stop.set()
watcher.cancel()


async def test_promote_when_quiet_hard_cap_under_constant_traffic(
ws_client: WebSocketClient, monkeypatch
) -> None:
"""A perpetually chatty controller (frames never stop) must still
promote at the WS_SYNC_MAX_SECONDS hard cap rather than stall in
SYNCING forever."""
monkeypatch.setattr(ws_module, "WS_SYNC_QUIET_SECONDS", 0.05)
monkeypatch.setattr(ws_module, "WS_SYNC_MAX_SECONDS", 0.15)
ws_client._status = ES_SYNCING
ws_client._frame_count = 0

stop = asyncio.Event()

async def _flood() -> None:
while not stop.is_set():
ws_client._frame_count += 1
await asyncio.sleep(0.005)

flood = asyncio.create_task(_flood())
try:
await asyncio.wait_for(ws_client._promote_when_quiet(), timeout=2.0)
assert ws_client.status == ES_CONNECTED
finally:
stop.set()
await asyncio.gather(flood, return_exceptions=True)


async def test_websocket_holds_syncing_through_replay_then_connects(
ws_client: WebSocketClient, monkeypatch
) -> None:
"""End-to-end through ``websocket()``: a replayed frame is routed
(records still update) while the stream stays SYNCING, and CONNECTED
is only emitted after the quiet window."""
# A wide quiet window (0.2s) keeps the "still SYNCING" assertion below
# well clear of the watcher's first post-frame sample -- the routed
# poll breaks within a few ms, leaving ~0.2s of slack before promotion
# becomes possible, so a scheduler hiccup can't race it.
monkeypatch.setattr(ws_module, "WS_SYNC_QUIET_SECONDS", 0.2)
monkeypatch.setattr(ws_module, "WS_SYNC_MAX_SECONDS", 5.0)
ws = _FakeWS(messages=[_ws_msg(aiohttp.WSMsgType.TEXT, "<x/>")], keep_open=True)
ws_client.req_session.ws_connect = _ws_connect_returning(ws)
routed: list[str] = []

async def _capture(self, msg):
routed.append(msg)

with (
patch.object(WebSocketClient, "_route_message", _capture),
patch.object(WebSocketClient, "_reconnect"),
):
task = asyncio.create_task(ws_client.websocket())
try:
# Wait for the replayed frame to be routed into state.
for _ in range(200):
if routed:
break
await asyncio.sleep(0.005)
await asyncio.sleep(0) # flush any same-tick callbacks

# Replay routed (state synced) but the stream is not yet live.
assert routed == ["<x/>"]
assert ws_client.status == ES_SYNCING

# After the quiet window the stream goes live.
for _ in range(200):
if ws_client.status == ES_CONNECTED:
break
await asyncio.sleep(0.01)
assert ws_client.status == ES_CONNECTED
finally:
await ws.close()
await asyncio.wait_for(task, timeout=2.0)


async def test_websocket_socket_drop_before_quiet_never_connects(
ws_client: WebSocketClient, monkeypatch
) -> None:
"""If the socket drops before the replay settles, the watcher is
cancelled in the ``finally`` and CONNECTED is never emitted — a
connection that never settles must not report itself as live."""
monkeypatch.setattr(ws_module, "WS_SYNC_QUIET_SECONDS", 5.0)
monkeypatch.setattr(ws_module, "WS_SYNC_MAX_SECONDS", 10.0)
ws = _FakeWS(messages=[_ws_msg(aiohttp.WSMsgType.TEXT, "<x/>")]) # drains then closes
ws_client.req_session.ws_connect = _ws_connect_returning(ws)
notifications: list[str] = []
ws_client.isy.connection_events.notify = notifications.append

async def _capture(self, msg):
return None

with (
patch.object(WebSocketClient, "_route_message", _capture),
patch.object(WebSocketClient, "_reconnect"),
):
await ws_client.websocket()

assert ES_SYNCING in notifications
assert ES_CONNECTED not in notifications
assert ws_client._sync_task is None # cancelled in the finally


# -- helpers ----------------------------------------------------------


Expand Down
Loading