"""High-level handle for an IoX 6+ controller.
:class:`Controller` is the single user-facing entry point that
composes the lower layers:
* :class:`pyisyox.auth.Auth` — credentials and token lifecycle.
* :class:`pyisyox.client.IoXClient` — JSON-first HTTP client with the
initial-load orchestrator.
* :class:`pyisyox.runtime.EventDispatcher` — parses ``/rest/subscribe``
frames and overlays property updates onto the node registry.
* :class:`pyisyox.runtime.WebSocketEventStream` — runs the WS read
loop with auto-reconnect.
* :class:`pyisyox.runtime.Node` — user handles for individual devices,
with editor-validated :meth:`Node.send_command`.
A typical consumer (HA Core, hacs-isy994, a CLI) constructs one
``Controller``, ``await``s :meth:`connect`, then drives nodes through
``controller.nodes[address].send_command(...)`` and subscribes to
event/status callbacks. WebSocket frames mutate
``controller.nodes[...].properties`` in place, so attribute reads
always reflect the latest controller state.
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import asdict
from typing import TYPE_CHECKING, Any
import aiohttp
from pyisyox.client import (
ClientError,
IoXClient,
NodeType,
VariableField,
VariableRecord,
)
from pyisyox.helpers.session import build_sslcontext
from pyisyox.paths import PROFILES_PATH, SUBSCRIBE_PATH
from pyisyox.runtime.events import EventDispatcher
from pyisyox.runtime.folder import Folder
from pyisyox.runtime.group import Group
from pyisyox.runtime.network_resource import NetworkResource
from pyisyox.runtime.node import Node
from pyisyox.runtime.program import Program, ProgramCommand, ProgramFolder
from pyisyox.runtime.variable import Variable
from pyisyox.runtime.ws import WebSocketEventStream
from pyisyox.schema.profile import Profile, ProfileMergeResult
if TYPE_CHECKING:
from collections.abc import Callable
from pyisyox.auth import Auth
from pyisyox.client import ControllerConfig, LoadResult
from pyisyox.runtime.events import (
Event,
EventListener,
NodeLifecycleListener,
ProgramStatusListener,
VariableTableChangeEvent,
VariableTableChangeListener,
)
from pyisyox.runtime.ws import StatusListener
_LOGGER = logging.getLogger(__name__)
[docs]
class ControllerNotConnectedError(RuntimeError):
"""Raised when accessing live data before :meth:`Controller.connect`
has populated it."""
[docs]
class Controller:
"""Top-level handle for one IoX 6+ controller (eisy / Polisy).
Construction is cheap and synchronous; the network round-trips
happen in :meth:`connect`. Disconnect is symmetric: :meth:`stop`
closes the WebSocket and (if the controller owns the aiohttp
session) closes that too.
Threading: this class is async-only. The WS reader runs as a
background ``asyncio.Task``; do not block the event loop in event
or status callbacks — schedule heavier work on a separate task.
"""
__slots__ = (
"_auth",
"_background_tasks",
"_base_url",
"_client",
"_dispatcher",
"_loaded",
"_owns_session",
"_session",
"_tls_version",
"_verify_ssl",
"_ws",
"_ws_path",
)
def __init__(
self,
base_url: str,
auth: Auth,
session: aiohttp.ClientSession | None = None,
ws_path: str = SUBSCRIBE_PATH,
tls_version: float | None = None,
verify_ssl: bool = False,
) -> None:
"""Configure the controller.
Args:
base_url: Scheme + host + port (no trailing slash). Use
``"https://eisy.local:443"`` for portal mode or
``"https://eisy.local:8443"`` for local mode.
auth: A configured :class:`pyisyox.auth.Auth` instance —
typically :class:`pyisyox.auth.PortalAuth` (default,
JWT bearer) or :class:`pyisyox.auth.LocalAuth` (HTTP
basic, feature-degraded).
session: An aiohttp ``ClientSession`` to reuse. When
``None``, the controller creates and owns one
(configured via ``tls_version`` and ``verify_ssl``)
and will close it on :meth:`stop`. When provided,
``tls_version`` and ``verify_ssl`` are ignored —
consumers must configure SSL on their own session.
ws_path: WebSocket path. Default ``/rest/subscribe`` works
under both auth modes; ``/api/events/subscribe`` is
an opt-in JSON envelope path that requires portal
JWT auth and an initial frame handshake.
tls_version: ``None`` (default) auto-negotiates TLS 1.2 or
1.3. Pin to ``1.2`` or ``1.3`` for reproducible
behaviour. Used only when the controller creates its
own session.
verify_ssl: ``False`` (default) accepts the eisy's
self-signed certificate. ``True`` enforces strict
verification — for users with their own CA. Used only
when the controller creates its own session.
"""
self._base_url = base_url.rstrip("/")
self._auth = auth
self._session = session
self._owns_session = session is None
self._ws_path = ws_path
self._tls_version = tls_version
self._verify_ssl = verify_ssl
self._client: IoXClient | None = None
self._dispatcher: EventDispatcher | None = None
self._ws: WebSocketEventStream | None = None
self._loaded: LoadResult | None = None
# Strong-ref jail for fire-and-forget tasks (auto-refresh on
# variable-table-change). Without a reference asyncio may GC
# the task before it runs to completion. Tasks remove
# themselves on done.
self._background_tasks: set[asyncio.Task[Any]] = set()
# --- lifecycle -----------------------------------------------------
[docs]
async def connect(self, *, start_websocket: bool = True) -> None:
"""Authenticate, run the initial load, and (optionally) open the WS.
Builds the IoXClient, calls :meth:`IoXClient.connect` to fetch
``/api/config``, ``/rest/profiles``, ``/api/nodes``,
``/rest/status``, programs/triggers/variables in parallel, and
merges the status overlay. Then constructs the
:class:`EventDispatcher` over the same node registry the
runtime :class:`Node` instances will read from, so WebSocket
frames mutate properties in place.
Args:
start_websocket: When ``True`` (default), the WS reader
starts in the background after the initial load
completes. Pass ``False`` for one-shot reads (CLI
tools, tests) where the consumer doesn't need live
updates.
Raises:
Any error from :meth:`IoXClient.connect` (auth failure,
HTTP failure, malformed payload) propagates unchanged.
"""
if self._session is None:
self._session = self._build_owned_session()
self._owns_session = True
self._client = IoXClient(self._base_url, self._auth, self._session)
self._loaded = await self._client.connect()
# Bind the dispatcher to the same dict the LoadResult holds —
# so runtime Nodes (which read from LoadResult.nodes) see live
# updates without an explicit notification path.
self._dispatcher = EventDispatcher(
self._loaded.nodes,
programs=self._loaded.programs,
variables=self._loaded.variables,
groups=self._loaded.groups,
)
# Auto-refresh the affected variable type whenever the
# controller emits VARIABLE_TABLE_CHANGED (create / delete /
# precision change). The event handler is sync; the refresh
# is fire-and-forget — failures land in the logger, not in
# the dispatcher loop.
self._dispatcher.add_variable_table_change_listener(self._on_variable_table_changed)
if start_websocket:
self._ws = WebSocketEventStream(self._client, self._dispatcher, path=self._ws_path)
self._ws.start()
[docs]
async def stop(self) -> None:
"""Stop the WebSocket, log out, and (if we own it) close the session.
Idempotent — safe to call from cleanup paths even if
:meth:`connect` partially failed.
"""
if self._ws is not None:
await self._ws.stop()
self._ws = None
# Best-effort logout to invalidate any server-side session
# (PortalAuth posts /api/logout; LocalAuth no-ops). Run before
# closing the session because PortalAuth needs it for the call.
if self._session is not None:
try:
await self._auth.close(self._session, self._base_url)
except Exception: # pylint: disable=broad-except
_LOGGER.debug("auth.close() raised; ignoring during shutdown", exc_info=True)
if self._owns_session and self._session is not None:
await self._session.close()
self._session = None
# Drop the loaded snapshot so accessing properties after stop
# raises a clear error instead of returning stale data.
self._loaded = None
self._dispatcher = None
self._client = None
# --- accessors -----------------------------------------------------
@property
def connected(self) -> bool:
"""True between :meth:`connect` returning successfully and
:meth:`stop` being called."""
return self._loaded is not None
@property
def websocket(self) -> WebSocketEventStream | None:
"""The active WebSocket stream, or ``None``.
Returns the live :class:`WebSocketEventStream` when
:meth:`connect` was called with ``start_websocket=True`` and
:meth:`stop` hasn't run yet. ``None`` for one-shot reads
(CLI tools, snapshot tests) that opted out of the WS
upgrade. Consumers polling stream health (HA system_health,
diagnostics) read ``websocket.status`` /
``websocket.last_event_at`` directly.
"""
return self._ws
@property
def base_url(self) -> str:
"""The controller URL passed to ``__init__``."""
return self._base_url
@property
def config(self) -> ControllerConfig:
"""Decoded ``/api/config`` slice (uuid, version, portalHost)."""
return self._loaded_or_raise().config
@property
def name(self) -> str:
"""User-assigned controller name (e.g. ``"Main eisy"``).
Sourced from the ``<name>`` of the root group in
``/rest/nodes`` (the same value the eisy admin UI shows and
that the legacy ``/rest/config`` ``<configuration><root><name>``
path carried). Empty string when the controller hasn't been
named or the legacy endpoint isn't available.
Consumers driving HA device names should prefer this over the
hostname so users see the friendly label they set on the
controller, with the hostname as a fallback.
"""
return self._loaded_or_raise().root_name
@property
def profile(self) -> Profile:
"""The decoded ``/rest/profiles`` blob with built nodedef lookup."""
return self._loaded_or_raise().profile
@property
def nodes(self) -> dict[str, Node]:
"""Map of node address → runtime :class:`Node`.
Built lazily on first access from the loaded :class:`NodeRecord`
registry; subsequent accesses return the cached dict so
identity is stable across calls (consumers can hold references
to specific nodes safely).
"""
loaded = self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover — connect() sets both
raise ControllerNotConnectedError("controller has no client")
return {
address: Node.from_record(record, loaded.profile, client)
for address, record in loaded.nodes.items()
}
@property
def groups(self) -> dict[str, Group]:
"""Map of group address → runtime :class:`Group` (IoX scenes).
Sourced from ``/rest/nodes`` XML at connect time. The
controller-self group (``flag="12"``) is filtered out.
"""
loaded = self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover — connect() sets both
raise ControllerNotConnectedError("controller has no client")
return {
address: Group.from_record(record, loaded.profile, client, nodes=loaded.nodes)
for address, record in loaded.groups.items()
}
@property
def folders(self) -> dict[str, Folder]:
"""Map of folder address → runtime :class:`Folder` (org tree only)."""
loaded = self._loaded_or_raise()
return {address: Folder(record) for address, record in loaded.folders.items()}
@property
def programs(self) -> dict[str, Program]:
"""Map of program id → runtime :class:`Program`.
Folders share the same id space but live under
:attr:`program_folders`; this map only contains executable
programs (``is_folder=False``).
"""
loaded = self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover
raise ControllerNotConnectedError("controller has no client")
return {
address: Program(record, client)
for address, record in loaded.programs.items()
if not record.is_folder
}
@property
def program_folders(self) -> dict[str, ProgramFolder]:
"""Map of folder id → runtime :class:`ProgramFolder`.
The synthetic root folder (``"My Programs"`` on stock eisy
firmware) is included — consumers walking the tree from the
controller can use it as the root anchor.
"""
loaded = self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover
raise ControllerNotConnectedError("controller has no client")
return {
address: ProgramFolder(record, client)
for address, record in loaded.programs.items()
if record.is_folder
}
@property
def triggers(self) -> list[dict]:
"""Raw ``/api/triggers`` data list — program AST as JSON."""
return self._loaded_or_raise().triggers
@property
def variables(self) -> dict[str, dict[str, Variable]]:
"""Map of variable type → id → typed :class:`Variable` wrapper.
Outer key is ``"1"`` (integer) or ``"2"`` (state); inner key is
the variable id within that type. Each :class:`Variable` shares
its underlying :class:`VariableRecord` with the controller's
loaded state — writes via the wrapper's mutation coroutines
update the record in place so subsequent reads reflect the new
value without waiting for a WS frame.
Returns an empty inner dict for a type the controller has no
variables in.
"""
loaded = self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover
raise ControllerNotConnectedError("controller has no client")
return {
type_id: {vid: Variable.from_record(record, client) for vid, record in records.items()}
for type_id, records in loaded.variables.items()
}
@property
def network_resources(self) -> dict[str, NetworkResource]:
"""Map of resource id → runtime :class:`NetworkResource`.
Empty when the controller has no networking module enabled —
the optional endpoint either 404s or returns an empty
``<NetConfig/>``, both flattened to ``{}`` here.
"""
loaded = self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover
raise ControllerNotConnectedError("controller has no client")
return {
address: NetworkResource(record, client) for address, record in loaded.network_resources.items()
}
# --- snapshot -----------------------------------------------------
[docs]
def to_dict(self) -> dict[str, Any]:
"""Flatten the full controller state to a JSON-compatible dict.
Aggregates every loaded collection (nodes / groups / folders /
programs / program_folders / variables / network_resources)
plus the controller's own config + WebSocket health. Each
nested object's structural fields come from its own
:meth:`to_dict` so the same code path drives the
``pyisyox -m … --dump`` CLI flag and consumer diagnostics.
Raises :class:`ControllerNotConnectedError` when called before
:meth:`connect` (no loaded state to snapshot).
"""
ws = self._ws
return {
"name": self.name,
"config": asdict(self.config),
"connected": self.connected,
"websocket": {
"status": ws.status.value if ws is not None else None,
"last_event_at": ws.last_event_at.isoformat()
if ws is not None and ws.last_event_at is not None
else None,
},
"profile": self.profile.to_dict(),
"nodes": {addr: node.to_dict() for addr, node in self.nodes.items()},
"groups": {addr: group.to_dict() for addr, group in self.groups.items()},
"folders": {addr: folder.to_dict() for addr, folder in self.folders.items()},
"programs": {addr: program.to_dict() for addr, program in self.programs.items()},
"program_folders": {addr: folder.to_dict() for addr, folder in self.program_folders.items()},
"variables": {
type_id: {vid: var.to_dict() for vid, var in vars_.items()}
for type_id, vars_ in self.variables.items()
},
"network_resources": {
addr: resource.to_dict() for addr, resource in self.network_resources.items()
},
}
# --- dynamic profile reload ---------------------------------------
[docs]
async def refresh_profile(self) -> ProfileMergeResult:
"""Re-fetch ``/rest/profiles`` and merge updates into the live profile.
Designed for PG3 dynamic profile reload — when a plugin
updates its nodedefs at runtime, consumers detect the
controller-side signal (the WS event control code is plugin-
+ version-specific; capture it from a real reload to wire up
an automatic listener) and call this method to absorb the
change.
The live :class:`Profile` is mutated in place: existing
:class:`pyisyox.runtime.Node` instances that resolved against
a NodeDef before the reload now see the new NodeDef on their
next attribute access. The returned :class:`ProfileMergeResult`
lists the lookup-key triples that were added vs replaced so
consumers can re-classify or invalidate any caches keyed on
nodedef.
Returns:
A :class:`ProfileMergeResult` summarising the diff. Empty
(``result.changed is False``) when the controller's
response was identical to what we had.
Raises:
ControllerNotConnectedError: When called before
:meth:`connect`.
ClientError / HTTPError / AuthError: As with any HTTP
round-trip.
"""
loaded = self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover — connect() sets both
raise ControllerNotConnectedError("controller has no client")
# _get_json is intentionally accessed across the client boundary —
# the Controller is the only consumer that legitimately needs to
# re-issue a load-time endpoint outside of the connect() flow.
new_raw = await client._get_json( # pylint: disable=protected-access
PROFILES_PATH
)
incoming = Profile.load_from_json(new_raw)
return loaded.profile.merge(incoming)
# --- subscriptions -------------------------------------------------
[docs]
def add_event_listener(self, callback: EventListener) -> Callable[[], None]:
"""Subscribe to every parsed WebSocket event.
The dispatcher applies the property update *before* calling
listeners, so callbacks observing a property event can read
the new value via ``controller.nodes[address].properties[id]``
synchronously.
Returns:
An unsubscribe function. Calling it removes ``callback``.
Raises:
ControllerNotConnectedError: When called before
:meth:`connect` or after :meth:`stop`.
"""
if self._dispatcher is None:
raise ControllerNotConnectedError("add_event_listener requires connect() to have completed")
return self._dispatcher.add_listener(callback)
[docs]
def add_status_listener(self, callback: StatusListener) -> Callable[[], None]:
"""Subscribe to WebSocket lifecycle status changes.
Returns:
An unsubscribe function.
Raises:
ControllerNotConnectedError: When called before
:meth:`connect` (or after :meth:`stop`), or when
:meth:`connect` was called with ``start_websocket=False``.
"""
if self._ws is None:
raise ControllerNotConnectedError(
"add_status_listener requires the WebSocket reader to be started"
)
return self._ws.add_status_listener(callback)
[docs]
def add_node_lifecycle_listener(self, callback: NodeLifecycleListener) -> Callable[[], None]:
"""Subscribe to node-tree lifecycle changes (add / remove / rename).
The eisy emits ``<control>_3</control>`` frames when nodes
appear or disappear (typically driven by PG3 plugin reloads).
The dispatcher does **not** auto-update the live registry —
consumers decide whether to call :meth:`refresh` or live with
a stale view until the user manually reloads the integration.
HA Core's intended UX is to register a Repair issue on the
first lifecycle event with ``ev.requires_reload is True`` and
clear it once the user-initiated reload completes.
Returns:
An unsubscribe function.
Raises:
ControllerNotConnectedError: When called before :meth:`connect`.
"""
if self._dispatcher is None:
raise ControllerNotConnectedError(
"add_node_lifecycle_listener requires connect() to have completed"
)
return self._dispatcher.add_lifecycle_listener(callback)
[docs]
def add_program_status_listener(self, callback: ProgramStatusListener) -> Callable[[], None]:
"""Subscribe to program-status changes (the ``<control>_1</control>``
action ``"0"`` frames).
The dispatcher mutates the matching ``ProgramRecord.status`` /
``running`` in place before firing, so consumers reading
``controller.programs[id].status`` from the callback see the
new value.
Returns:
An unsubscribe function.
Raises:
ControllerNotConnectedError: When called before :meth:`connect`.
"""
if self._dispatcher is None:
raise ControllerNotConnectedError(
"add_program_status_listener requires connect() to have completed"
)
return self._dispatcher.add_program_status_listener(callback)
[docs]
def add_variable_table_change_listener(self, callback: VariableTableChangeListener) -> Callable[[], None]:
"""Subscribe to ``_1``/``9`` ``VARIABLE_TABLE_CHANGED`` frames.
These fire on variable create / delete / rename / precision
change — not on per-value writes (those use ``_1``/``6`` and
``_1``/``7``). The :class:`Controller` already wires its own
listener that auto-refreshes
``self.variables[type_id]``; consumers add their own listener
on top to drive UI invalidation, telemetry, etc.
Returns:
An unsubscribe function.
Raises:
ControllerNotConnectedError: When called before
:meth:`connect`.
"""
if self._dispatcher is None:
raise ControllerNotConnectedError(
"add_variable_table_change_listener requires connect() to have completed"
)
return self._dispatcher.add_variable_table_change_listener(callback)
# --- mutation -----------------------------------------------------
[docs]
async def refresh(self) -> ProfileMergeResult:
"""Re-run the parallel load fan-out and merge results into the
live :class:`LoadResult`.
Use after a :class:`NodeLifecycleEvent` with
``requires_reload=True`` to absorb the new node tree without
re-authenticating. The live :class:`Profile` is mutated in
place (see :meth:`Profile.merge`); the ``nodes`` / ``groups``
/ ``folders`` / ``programs`` / ``triggers`` / ``variables``
registries on the LoadResult are updated to match the fresh
snapshot. The dispatcher's binding to ``LoadResult.nodes``
survives because we mutate the dict in place.
Returns:
The :class:`ProfileMergeResult` from the schema merge —
useful for tracking which nodedefs changed.
Raises:
ControllerNotConnectedError: When called before :meth:`connect`.
"""
loaded = self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover — connect() sets both
raise ControllerNotConnectedError("controller has no client")
fresh = await client.load(loaded.config)
diff = loaded.profile.merge(fresh.profile)
# Mutate node registry in place so the EventDispatcher's
# binding stays valid. Other registries can be replaced.
loaded.nodes.clear()
loaded.nodes.update(fresh.nodes)
loaded.groups = fresh.groups
# groups is replaced (not mutated in place), so the dispatcher's
# member→groups reverse index must be rebuilt or post-reload
# scene-membership changes would be missed.
if self._dispatcher is not None:
self._dispatcher.update_groups(fresh.groups)
loaded.folders = fresh.folders
loaded.programs = fresh.programs
loaded.triggers = fresh.triggers
loaded.variables = fresh.variables
loaded.network_resources = fresh.network_resources
return diff
[docs]
async def send_program_command(self, program_id: str, command: ProgramCommand | str) -> None:
"""Send a program / folder command via the legacy REST endpoint.
Wire shape: ``GET /rest/programs/{id}/{command}``. See
:class:`pyisyox.runtime.ProgramCommand` for the typed command
set; bare strings are accepted too (the StrEnum members are
themselves strings, so ``ProgramCommand.RUN_THEN == "runThen"``
— pass either form).
Lower-level than :meth:`Program.run` etc.; useful for
consumers that hold ids without a Program wrapper (e.g. an
HA service receiving raw ids).
"""
self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover
raise ControllerNotConnectedError("controller has no client")
await client.run_program_command(program_id, command)
[docs]
async def run_network_resource(self, resource_id: str | int) -> None:
"""Fire a network resource by id.
Wire shape: ``GET /rest/networking/resources/{id}``. Treat as
fire-and-forget — the controller acknowledges receipt only,
not the result of the underlying HTTP / TCP / UDP fire.
"""
self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover
raise ControllerNotConnectedError("controller has no client")
await client.run_network_resource(resource_id)
[docs]
async def set_variable_value(self, var_type: int | str, var_id: int | str, value: int) -> None:
"""Set the current value of a controller variable.
Wire shape: ``POST /api/variables/{type}/{id}`` with body
``{"value": <int>}``.
Args:
var_type: ``1`` (integer) or ``2`` (state). Strings accepted.
var_id: Variable id within the type.
value: New value to write.
Raises:
ControllerNotConnectedError: When called before :meth:`connect`.
HTTPError / ClientError: On wire failures.
"""
await self._post_variable(var_type, var_id, {VariableField.VALUE: int(value)})
[docs]
async def set_variable_init(self, var_type: int | str, var_id: int | str, init: int) -> None:
"""Set the initial / restore-on-startup value of a variable.
Wire shape: ``POST /api/variables/{type}/{id}`` with
``{"init": <int>}``.
"""
await self._post_variable(var_type, var_id, {VariableField.INIT: int(init)})
[docs]
async def rename_variable(self, var_type: int | str, var_id: int | str, name: str) -> None:
"""Rename a variable.
Wire shape: ``POST /api/variables/{type}/{id}`` with
``{"name": "<str>"}``.
"""
await self._post_variable(var_type, var_id, {VariableField.NAME: name})
[docs]
async def create_variable(
self,
var_type: int | str,
name: str,
*,
prec: int = 0,
) -> Variable:
"""Create a new variable on the controller.
Wire shape: ``PUT /api/variables/{type}`` with body
``{"name": "<str>", "prec": <int>}``. The controller assigns
the id and returns the new record.
Inserts a :class:`VariableRecord` into the loaded registry
in place (so the dispatcher's binding survives) and returns
a :class:`Variable` wrapper bound to it. Per issue #125, the
controller silently drops ``init`` / ``value`` keys on PUT —
call :meth:`Variable.set_value` / :meth:`Variable.set_init`
on the returned wrapper to populate them.
Raises:
ControllerNotConnectedError: When called before :meth:`connect`.
ClientError: When the response payload is missing the new id.
"""
loaded = self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover
raise ControllerNotConnectedError("controller has no client")
response = await client.create_variable(var_type, name, prec=prec)
data = response.get("data") if isinstance(response, dict) else None
if not isinstance(data, dict):
raise ClientError(f"create_variable response missing data: {response!r}")
raw_id = data.get("id")
new_id = str(raw_id).strip() if raw_id not in (None, "") else ""
if not new_id:
raise ClientError(f"create_variable response missing id: {response!r}")
type_str = str(var_type)
record = VariableRecord(
type_id=type_str,
id=new_id,
name=str(data.get("name", name)),
# PUT silently drops init / value (issue #125 capture
# confirms a fresh variable is always val=0 / init=0).
value=0,
init=0,
precision=int(data.get("prec", prec) or 0),
ts="",
)
loaded.variables.setdefault(type_str, {})[new_id] = record
return Variable.from_record(record, client)
[docs]
async def refresh_variables(self, var_type: int | str) -> None:
"""Re-fetch one variable type and mutate the registry in place.
Wire shape: ``GET /api/variables/{type}``. Mutates
``self._loaded.variables[type]`` in place (clear + update) so
the dispatcher's binding to the same dict survives — a full
:meth:`refresh` would replace the dict and break per-record
WS overlay routing.
Used internally by the auto-wired
``VARIABLE_TABLE_CHANGED`` listener; also callable directly
when a consumer wants to force a re-sync.
"""
loaded = self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover
raise ControllerNotConnectedError("controller has no client")
type_str = str(var_type)
fresh = await client.get_variables_type(type_str)
bucket = loaded.variables.setdefault(type_str, {})
bucket.clear()
bucket.update(fresh)
def _on_variable_table_changed(self, event: VariableTableChangeEvent) -> None:
"""Auto-wired listener: refresh the affected variable type.
Sync handler (the dispatcher's contract); schedules the
async refresh as a background task. Failures land in the
log — they don't break the dispatcher loop or future
listeners.
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError: # pragma: no cover — listener fires inside the WS reader task
_LOGGER.debug("variable-table-change fired outside an event loop; skipping refresh")
return
task = loop.create_task(self._auto_refresh_variables(event.type_id))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
async def _auto_refresh_variables(self, type_id: str) -> None:
"""Wrapper that swallows + logs failures from the auto-refresh path."""
try:
await self.refresh_variables(type_id)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("auto-refresh of variables[type=%s] failed", type_id)
async def _post_variable(self, var_type: int | str, var_id: int | str, body: dict) -> None:
"""Internal: route a variable mutation through the IoXClient."""
self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover
raise ControllerNotConnectedError("controller has no client")
await client.post_variable_update(var_type, var_id, body)
[docs]
async def rename_node(self, address: str, name: str) -> None:
"""Rename a node.
Wire shape: ``POST /api/nodes/{address}`` with
``{"name": "<str>", "nodeType": "node"}``.
The ``nodeType`` field is required by the server. Use
:meth:`rename_group` for scenes.
"""
await self._post_node(address, {"name": name, "nodeType": NodeType.NODE})
[docs]
async def rename_group(self, address: str, name: str) -> None:
"""Rename a group / scene.
Same endpoint as :meth:`rename_node` but with
``nodeType: "group"`` so the server applies the change
through the scene registry.
"""
await self._post_node(address, {"name": name, "nodeType": NodeType.GROUP})
[docs]
async def rename_folder(self, address: str, name: str) -> None:
"""Rename a folder (organisational container).
Same endpoint as :meth:`rename_node` / :meth:`rename_group`
but with ``nodeType: "folder"``. Folders are address-keyed
like nodes/groups; their addresses are typically 5-digit
integers (family ``"13"``).
"""
await self._post_node(address, {"name": name, "nodeType": NodeType.FOLDER})
async def _post_node(self, address: str, body: dict) -> None:
"""Internal: route a node mutation through the IoXClient."""
self._loaded_or_raise()
client = self._client
if client is None: # pragma: no cover
raise ControllerNotConnectedError("controller has no client")
await client.post_node_update(address, body)
# --- testing seams -------------------------------------------------
[docs]
def feed_event_frame(self, raw_frame: str) -> Event | None:
"""Inject a raw frame into the dispatcher.
Useful in tests and CLIs replaying captured WebSocket data.
Production code paths drive the dispatcher through the
:class:`WebSocketEventStream` reader.
"""
if self._dispatcher is None:
raise ControllerNotConnectedError("feed_event_frame requires connect() to have completed")
return self._dispatcher.feed(raw_frame)
# --- internals -----------------------------------------------------
def _loaded_or_raise(self) -> LoadResult:
if self._loaded is None:
raise ControllerNotConnectedError(
"controller is not connected; call await controller.connect() first"
)
return self._loaded
def _build_owned_session(self) -> aiohttp.ClientSession:
"""Construct an aiohttp session honouring our TLS settings.
The cookie jar is set ``unsafe=True`` so cookies set on a bare
IP host (typical LAN deployment) survive — aiohttp's default
jar rejects them as a precaution that doesn't apply to a
known-trusted LAN target.
"""
use_https = self._base_url.startswith("https")
context = build_sslcontext(
use_https=use_https,
tls_version=self._tls_version,
verify_ssl=self._verify_ssl,
)
connector = aiohttp.TCPConnector(ssl=context) if context is not None else None
return aiohttp.ClientSession(
connector=connector,
cookie_jar=aiohttp.CookieJar(unsafe=True),
)