Source code for pyisyox.runtime.ws

"""WebSocket reader loop for the eisy event stream.

Opens ``wss://{host}/rest/subscribe`` (or another configured path) using
the same :class:`pyisyox.auth.Auth` strategy the HTTP client uses, then
runs a read loop that feeds every frame to an
:class:`~pyisyox.runtime.events.EventDispatcher`. Reconnects with
exponential backoff on transport errors; refreshes auth tokens on a
401-class WebSocket handshake failure.

Auth integration:

* :class:`LocalAuth` returns ``{"auth": aiohttp.BasicAuth(...)}`` from
  ``request_kwargs`` — aiohttp's ``ws_connect`` accepts ``auth``
  directly, so the upgrade carries an ``Authorization: Basic`` header.
* :class:`PortalAuth` returns ``{"headers": {"Authorization": "Bearer
  ..."}}``. ``ws_connect`` passes ``headers`` through verbatim, so the
  bearer rides on the upgrade.

The loop is intentionally split from the parsing/dispatch logic in
:mod:`pyisyox.runtime.events` so the dispatcher can be unit-tested
without WebSocket plumbing and the reader can be unit-tested without a
real WS server.
"""

from __future__ import annotations

import asyncio
import logging
from collections.abc import Callable
from datetime import UTC, datetime
from typing import TYPE_CHECKING

import aiohttp

from pyisyox.auth import AuthError
from pyisyox.constants import EventStreamStatus
from pyisyox.logging import LOG_VERBOSE
from pyisyox.paths import SUBSCRIBE_PATH

if TYPE_CHECKING:
    from pyisyox.client import IoXClient
    from pyisyox.runtime.events import EventDispatcher

_LOGGER = logging.getLogger(__name__)


#: Backoff schedule applied between reconnect attempts (seconds).
#: After the last entry the reader stays at the cap (60 s).
_BACKOFF_SCHEDULE: tuple[float, ...] = (1.0, 2.0, 5.0, 10.0, 30.0, 60.0)

#: After the socket opens the controller replays every node's current
#: status. The stream stays ``SYNCING`` until that burst goes quiet for
#: ``_SYNC_QUIET_SECONDS`` (no frame), then flips to ``CONNECTED``.
#: ``_SYNC_MAX_SECONDS`` is a hard cap so a chatty controller can never
#: stall the stream in ``SYNCING`` forever. Module-level so tests can
#: monkeypatch them small.
_SYNC_QUIET_SECONDS: float = 1.0
_SYNC_MAX_SECONDS: float = 10.0


StatusListener = Callable[[EventStreamStatus], None]


[docs] class WebSocketEventStream: """Background reader that feeds frames into an :class:`EventDispatcher`. Lifecycle: 1. :meth:`start` schedules the read task and returns immediately. 2. The task connects, dispatches frames, reconnects on transport errors, and pumps :class:`EventStreamStatus` notifications to any registered status listener. On each connect it holds ``SYNCING`` (not ``CONNECTED``) until the controller's initial status replay drains, so consumers don't treat the replay as live events. 3. :meth:`stop` cancels the task and closes any active WS. The class deliberately keeps its surface narrow — the consumer is expected to be the top-level ``ISY`` glue object that owns both the :class:`IoXClient` and the dispatcher. """ __slots__ = ( "_backoff_idx", "_client", "_dispatcher", "_frame_count", "_last_event_at", "_path", "_status", "_status_listeners", "_stop_requested", "_sync_task", "_task", "_ws", ) def __init__( self, client: IoXClient, dispatcher: EventDispatcher, path: str = SUBSCRIBE_PATH, ) -> None: """Bind to a client + dispatcher. Args: client: The HTTP client whose session and auth strategy drive the WS upgrade. The reader does not own the session lifecycle — caller (typically the ISY glue) does. dispatcher: Where parsed frames flow. path: WS path. Default is ``/rest/subscribe`` (works under both auth modes). ``/api/events/subscribe`` is opt-in for portal mode and requires sending a ``{"auth": {"token": ...}}`` initial frame; that path is not supported here yet. """ self._client = client self._dispatcher = dispatcher self._path = path self._task: asyncio.Task[None] | None = None self._ws: aiohttp.ClientWebSocketResponse | None = None self._stop_requested = False self._backoff_idx = 0 self._status_listeners: list[StatusListener] = [] self._status: EventStreamStatus = EventStreamStatus.NOT_STARTED self._last_event_at: datetime | None = None #: Bumped on every text frame; the sync watcher samples it to #: tell whether the post-connect status replay has gone quiet. self._frame_count = 0 self._sync_task: asyncio.Task[None] | None = None # --- public API ---------------------------------------------------- @property def status(self) -> EventStreamStatus: """Most-recent stream status. Updated on every transition (initialise / connect / reconnect / disconnect / lost). Defaults to :attr:`EventStreamStatus.NOT_STARTED` before :meth:`start`. Useful for system-health pages that want a single readable status string without subscribing to every notification. """ return self._status @property def connected(self) -> bool: """``True`` while the stream is in the ``CONNECTED`` state. Convenience over comparing :attr:`status` directly. Note that ``connected`` flipping ``False`` doesn't mean the reader has given up — it may be reconnecting, or in :attr:`EventStreamStatus.SYNCING` (socket open but the controller's initial status replay hasn't drained yet — intentionally *not* "connected" so event consumers don't treat the replay as live changes). """ return self._status == EventStreamStatus.CONNECTED @property def last_event_at(self) -> datetime | None: """UTC timestamp of the most recent text frame, or ``None`` if no frame has been received this lifetime. The eisy emits a heartbeat ``<control>_0</control>`` frame every 30 seconds even when nothing else changes, so a stale ``last_event_at`` (more than ~60 s ago) is a reasonable signal that the connection is broken even when the WS handshake hasn't returned an error yet. """ return self._last_event_at
[docs] def add_status_listener(self, callback: StatusListener) -> Callable[[], None]: """Register a callback for stream-status changes. Returns: An unsubscribe function. """ self._status_listeners.append(callback) def _unsubscribe() -> None: try: self._status_listeners.remove(callback) except ValueError: pass return _unsubscribe
[docs] def start(self) -> asyncio.Task[None]: """Start the background read loop. Idempotent — calling twice returns the existing task.""" if self._task is not None and not self._task.done(): return self._task self._stop_requested = False self._backoff_idx = 0 self._task = asyncio.create_task(self._run(), name="pyisyox-ws-reader") return self._task
[docs] async def stop(self) -> None: """Stop the read loop and close any active WebSocket.""" self._stop_requested = True if self._ws is not None and not self._ws.closed: with _suppress_aiohttp_close(): await self._ws.close() if self._task is not None: self._task.cancel() with _suppress_cancellation(): await self._task self._task = None self._notify(EventStreamStatus.DISCONNECTED)
# --- read loop ----------------------------------------------------- async def _run(self) -> None: """Connect, read, reconnect — the main coroutine.""" try: while not self._stop_requested: self._notify(EventStreamStatus.INITIALIZING) try: await self._connect_and_read() except AuthError: _LOGGER.warning("WS auth failed and could not be recovered; giving up") self._notify(EventStreamStatus.RECONNECT_FAILED) return except Exception: # pylint: disable=broad-except # asyncio.CancelledError is a BaseException, so this # `except Exception` does not catch it — stop() can # cancel the task cleanly without the loop swallowing # the cancellation. _LOGGER.exception("WS read loop hit unexpected error; will reconnect") # Read into a local so mypy doesn't narrow `_stop_requested` # to its loop-entry value across the await suspension above. stopping: bool = self._stop_requested if stopping: break self._notify(EventStreamStatus.LOST_CONNECTION) await self._sleep_with_backoff() self._notify(EventStreamStatus.RECONNECTING) except asyncio.CancelledError: # stop() requested; falls through to the DISCONNECTED notify # in stop() itself. pass async def _connect_and_read(self) -> None: """One connect-read cycle. Returns when the WS closes.""" url = self._ws_url() kwargs = await self._auth_kwargs() try: self._ws = await self._client.session.ws_connect(url, **kwargs) except aiohttp.WSServerHandshakeError as exc: if exc.status == 401: # Auth-side recovery (token refresh / re-login) and one retry. if not await self._client.auth.handle_unauthorized( self._client.session, self._client.base_url ): raise AuthError(f"WS auth could not recover from 401 on {url}") from exc kwargs = await self._auth_kwargs() self._ws = await self._client.session.ws_connect(url, **kwargs) else: raise try: self._backoff_idx = 0 # successful connect resets backoff # Socket is open, but the controller now replays every # node's current status — a burst of ST/DON/DOF frames # that are NOT live changes. Hold SYNCING until that burst # drains so event consumers don't fire on the replay # (spurious automation triggers on every connect/restart). self._frame_count = 0 self._notify(EventStreamStatus.SYNCING) self._sync_task = asyncio.create_task(self._promote_when_quiet(), name="pyisyox-ws-sync") async for msg in self._ws: if self._stop_requested: break if msg.type == aiohttp.WSMsgType.TEXT: self._last_event_at = datetime.now(UTC) self._frame_count += 1 if _LOGGER.isEnabledFor(LOG_VERBOSE): _LOGGER.log(LOG_VERBOSE, "WS frame: %s", msg.data) self._dispatcher.feed(msg.data) elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSING): break elif msg.type == aiohttp.WSMsgType.ERROR: _LOGGER.warning("WS message error: %s", self._ws.exception()) break # BINARY/PING/PONG — ignore. finally: if self._sync_task is not None: self._sync_task.cancel() with _suppress_cancellation(): await self._sync_task self._sync_task = None current = self._ws self._ws = None if current is not None and not current.closed: with _suppress_aiohttp_close(): await current.close() async def _promote_when_quiet(self) -> None: """Flip ``SYNCING`` → ``CONNECTED`` once the post-connect status replay goes quiet (or the hard cap elapses). Sampled rather than event-driven so the hot read loop stays a plain ``async for``: every ``_SYNC_QUIET_SECONDS`` we check whether any frame arrived since the last sample. Invariant: a window in which ``_frame_count`` did not change from the previous sample is treated as quiet — including the very first window when no frames have arrived yet (a silent controller promotes after one window), and a fast replay that both starts and finishes within a single window (a completed replay *is* quiet). ``_SYNC_MAX_SECONDS`` caps the wait so a perpetually chatty controller still goes live (note the cap is only evaluated after each ``_SYNC_QUIET_SECONDS`` sample, so effective max ≈ next sample boundary ≥ deadline — fine because the real config has quiet ≪ max). Cancelled by :meth:`_connect_and_read`'s ``finally`` if the socket drops first, so a connection that never settles never reports ``CONNECTED``. """ loop = asyncio.get_running_loop() deadline = loop.time() + _SYNC_MAX_SECONDS seen = self._frame_count while not self._stop_requested: await asyncio.sleep(_SYNC_QUIET_SECONDS) # Read into a local so mypy doesn't narrow `_stop_requested` # to its loop-entry value across the await above (same idiom # as `_run`); stop() flips it during the sleep. stopping: bool = self._stop_requested if stopping: return quiet = self._frame_count == seen seen = self._frame_count if quiet or loop.time() >= deadline: break if not self._stop_requested and self._status == EventStreamStatus.SYNCING: self._notify(EventStreamStatus.CONNECTED) async def _auth_kwargs(self) -> dict: """Authenticate (if not already) and gather aiohttp kwargs.""" # _authenticate_once is intentionally accessed across the client # boundary — the WS reader needs the same lazy-auth handshake the # HTTP fan-out uses, and exposing it as a public method would # invite consumers to call it directly. await self._client._authenticate_once() # pylint: disable=protected-access return await self._client.auth.request_kwargs(self._client.session, self._client.base_url) def _ws_url(self) -> str: """Translate the HTTP base URL into the ``wss://`` form.""" base = self._client.base_url if base.startswith("https://"): return f"wss://{base[len('https://') :]}{self._path}" if base.startswith("http://"): return f"ws://{base[len('http://') :]}{self._path}" return base + self._path async def _sleep_with_backoff(self) -> None: """Wait between reconnects, advancing through ``_BACKOFF_SCHEDULE``.""" delay = _BACKOFF_SCHEDULE[min(self._backoff_idx, len(_BACKOFF_SCHEDULE) - 1)] self._backoff_idx += 1 _LOGGER.debug("WS reconnect in %.1fs (attempt %d)", delay, self._backoff_idx) await asyncio.sleep(delay) def _notify(self, status: EventStreamStatus) -> None: """Fan a status update out to listeners; suppress listener errors. Logs every lifecycle transition at DEBUG (``pyisyox.runtime.ws``) so the connect → SYNCING → CONNECTED → reconnect sequence is visible in consumer debug logs without attaching a listener. Only real changes are logged — a status re-notified with the same value (e.g. ``INITIALIZING`` each reconnect attempt) is not repeated. """ if status != self._status: _LOGGER.debug("WS stream status: %s -> %s", self._status, status) self._status = status for listener in tuple(self._status_listeners): try: listener(status) except Exception: # pylint: disable=broad-except _LOGGER.exception("WS status listener raised; suppressing")
# --- small context-manager helpers --------------------------------------- class _suppress_aiohttp_close: """Swallow exceptions from a defensive ``ws.close()``. Closing an already-closed WS is a no-op in current aiohttp, but older versions raise; suppressing keeps the cleanup path safe across versions without needing a precise version pin. """ def __enter__(self) -> None: return None def __exit__(self, exc_type: object, exc: object, tb: object) -> bool: return exc_type is not None and issubclass(exc_type, Exception) # type: ignore[arg-type] class _suppress_cancellation: """Swallow ``asyncio.CancelledError`` from awaiting a cancelled task.""" def __enter__(self) -> None: return None def __exit__(self, exc_type: object, exc: object, tb: object) -> bool: return exc_type is asyncio.CancelledError