"""Authentication strategies for IoX 6 controllers.
Two modes share a single :class:`Auth` protocol so the HTTP client can be
auth-agnostic:
* :class:`LocalAuth` — HTTP basic against ``:8443/rest/*`` with the local
admin account. Offline by construction. Feature-degraded: no
``/api/triggers`` AST, no ``/api/variables`` names; must use the legacy
XML ``/rest/nodes`` for structure. Recommended only when the user
refuses to use a portal account.
* :class:`PortalAuth` — JWT bearer obtained from ``POST :443/api/login``.
Auto-refreshes via ``POST :443/api/jwt/refresh``. Verified offline-safe
on 2026-05-07: the eisy validates portal credentials and signs the JWT
locally; no my.isy.io round-trip during steady-state. **Recommended
default** — unlocks the modern ``/api/*`` JSON surface.
The :class:`PortalAuth` flow leaks the PG3 MQTT TLS keypair under
``data.ssl`` in the login response. :func:`redact_sensitive` (see
``pyisyox.redactor``) MUST be applied to any debug-level logging of the
response body.
Endpoint discovery and shape verification: ``POST /api/login`` body
``{"username": "<email>", "password": "<password>"}``; response
``{"successful": true, "data": {"accessToken": "<es256-jwt>",
"refreshToken": "<es256-jwt>", "ssl": {...}, ...}}``. Refresh body
``{"refreshToken": "<rt>"}`` returns the same data shape.
"""
from __future__ import annotations
import asyncio
import base64
import json
import logging
import time
from dataclasses import dataclass
from typing import Any, Protocol
import aiohttp
_LOGGER = logging.getLogger(__name__)
[docs]
class AuthError(Exception):
"""Authentication failure (login rejected, refresh failed, etc.)."""
[docs]
@dataclass(slots=True)
class TokenPair:
"""JWT access + refresh tokens with decoded expiry for proactive refresh.
Attributes:
access_token: Short-lived bearer token (default 1 h TTL).
refresh_token: Long-lived token used to mint a new access token
(default 30 d TTL).
access_expires_at: Unix timestamp at which ``access_token``
expires. Decoded from the JWT ``exp`` claim. ``0`` if the
token couldn't be decoded.
refresh_expires_at: Unix timestamp at which ``refresh_token``
expires. ``0`` if undecoded.
"""
access_token: str
refresh_token: str
access_expires_at: float = 0.0
refresh_expires_at: float = 0.0
[docs]
@classmethod
def from_response(cls, data: dict[str, Any]) -> TokenPair:
"""Build a :class:`TokenPair` from a ``/api/login`` or
``/api/jwt/refresh`` response ``data`` dict."""
access = data["accessToken"]
refresh = data["refreshToken"]
return cls(
access_token=access,
refresh_token=refresh,
access_expires_at=_jwt_exp(access),
refresh_expires_at=_jwt_exp(refresh),
)
[docs]
def access_expires_within(self, seconds: float, now: float | None = None) -> bool:
"""True if the access token expires within ``seconds`` of ``now``.
Used to trigger a proactive refresh before a request, avoiding the
cost of one round-trip + one 401 + one refresh + one retry.
"""
if self.access_expires_at <= 0:
return False
current = now if now is not None else time.time()
return current + seconds >= self.access_expires_at
def _jwt_exp(token: str) -> float:
"""Extract the ``exp`` claim from a JWT, returning 0 if not parseable.
No signature verification — we trust whatever the eisy issued. The exp
claim is used solely for client-side proactive-refresh scheduling.
A return value of ``0`` is a sentinel meaning "expiry unknown";
:meth:`TokenPair.access_expires_within` treats this as
"skip proactive refresh", so reactive 401-handling is the only
recovery path until the operator notices the warning logged here
and updates the parser. Forcing immediate refresh on undecodable
tokens would loop endlessly if the eisy persisted in returning
malformed JWTs.
"""
try:
_, payload_b64, _ = token.split(".", 2)
except ValueError:
_LOGGER.warning(
"JWT does not have three dot-separated segments — skipping proactive "
"refresh; reactive 401 handling will still recover from server-side expiry."
)
return 0.0
padded = payload_b64 + "=" * (-len(payload_b64) % 4)
try:
payload = json.loads(base64.urlsafe_b64decode(padded))
except (ValueError, json.JSONDecodeError) as exc:
_LOGGER.warning(
"JWT payload not decodable as base64-url JSON (%s) — skipping proactive refresh",
exc,
)
return 0.0
exp = payload.get("exp")
if not isinstance(exp, (int, float)):
_LOGGER.warning(
"JWT payload missing numeric 'exp' claim (got %r) — skipping proactive refresh",
exp,
)
return 0.0
return float(exp)
[docs]
class Auth(Protocol):
"""Auth strategy protocol shared by :class:`LocalAuth` and :class:`PortalAuth`.
Not ``@runtime_checkable`` — ``isinstance(x, Auth)`` against an
unrelated class that happens to share these attribute names would
pass without verifying coroutine signatures, which masks bugs.
Tests construct the concrete classes directly.
The HTTP client calls :meth:`authenticate` once during connect, then
:meth:`request_kwargs` before every request to obtain the kwargs to
splat into ``session.get(...)``/``session.post(...)``. On a 401
response, the client calls :meth:`handle_unauthorized`; if it returns
True, the client retries the original request once.
"""
[docs]
async def authenticate(self, session: aiohttp.ClientSession, base_url: str) -> None:
"""Perform any one-time authentication setup (e.g., login POST)."""
[docs]
async def request_kwargs(self, session: aiohttp.ClientSession, base_url: str) -> dict[str, Any]:
"""Return kwargs for ``session.request()`` (auth, headers, etc.)."""
return {}
[docs]
async def handle_unauthorized(self, session: aiohttp.ClientSession, base_url: str) -> bool:
"""Handle a 401 response. Return True if re-auth succeeded and the
original request should be retried; False if the auth state cannot
recover (caller should propagate the 401 as a permanent error)."""
return False
[docs]
async def close(self, session: aiohttp.ClientSession, base_url: str) -> None:
"""Release any auth-held resources (e.g., explicit logout).
``session`` and ``base_url`` are passed so implementations can
make a final logout call to invalidate server-side state
(PortalAuth posts ``/api/logout``); LocalAuth ignores them
since basic-auth has no server-side session.
"""
[docs]
class LocalAuth:
"""HTTP basic auth against ``:8443/rest/*`` with the local admin account.
No login round-trip is needed — credentials are passed on every request.
A 401 on this path means the credentials are wrong, so re-auth cannot
recover.
"""
def __init__(self, username: str, password: str) -> None:
"""Store the local admin credentials.
Args:
username: Local admin username (typically ``"admin"``).
password: Local admin password.
"""
self._auth = aiohttp.BasicAuth(username, password)
[docs]
async def authenticate(self, session: aiohttp.ClientSession, base_url: str) -> None:
"""No-op — basic auth attaches per request."""
[docs]
async def request_kwargs(self, session: aiohttp.ClientSession, base_url: str) -> dict[str, Any]:
"""Return kwargs that attach HTTP basic auth."""
return {"auth": self._auth}
[docs]
async def handle_unauthorized(self, session: aiohttp.ClientSession, base_url: str) -> bool:
"""Cannot recover from 401 with basic auth — credentials are wrong."""
return False
[docs]
async def close(self, session: aiohttp.ClientSession, base_url: str) -> None:
"""No-op — basic auth has no server-side session to tear down."""
[docs]
class PortalAuth:
"""JWT bearer auth from ``POST :443/api/login``.
Maintains an in-memory :class:`TokenPair` with proactive refresh. On
401, attempts one refresh; if refresh fails (or has expired), falls
back to a fresh login.
Login URL: ``{base_url}/api/login``. Refresh URL:
``{base_url}/api/jwt/refresh``. Logout URL (optional, on
:meth:`close`): ``{base_url}/api/jwt/logout``. Verified against
eisy 1.0.3 — ``POST /api/jwt/logout`` returns ``200`` with
``{"successful": true, "data": null}``. (Pre-2026-05-12 versions
of this module used ``/api/logout``, which 404s.)
"""
LOGIN_PATH = "/api/login"
REFRESH_PATH = "/api/jwt/refresh"
LOGOUT_PATH = "/api/jwt/logout"
#: Number of seconds before access-token expiry at which we proactively refresh.
PROACTIVE_REFRESH_LEEWAY = 60.0
def __init__(self, email: str, password: str) -> None:
"""Store the portal credentials. No network calls happen here.
Args:
email: Portal email address. ``:443/api/login`` rejects
non-email usernames at the request-validation layer
(verified 2026-05-07).
password: Portal password.
"""
self._email = email
self._password = password
self._tokens: TokenPair | None = None
# Serialises _login / _refresh / _refresh_or_relogin so concurrent
# consumers (e.g. a background poll racing a user command inside
# the proactive-refresh leeway window) collapse onto a single
# network round-trip. Each entry re-checks the cached token state
# after acquiring the lock, so the second waiter no-ops.
self._auth_lock: asyncio.Lock | None = None
def _lock(self) -> asyncio.Lock:
"""Lazy-construct the lock so the constructor stays event-loop-free.
Lock creation needs a running event loop; the constructor is
called synchronously (often before the loop exists) so we defer
construction to the first ``async`` entry point.
"""
if self._auth_lock is None:
self._auth_lock = asyncio.Lock()
return self._auth_lock
@property
def tokens(self) -> TokenPair | None:
"""Currently held tokens, or ``None`` if not yet authenticated.
Tests use this to assert state without forcing a real network round-trip.
"""
return self._tokens
[docs]
async def authenticate(self, session: aiohttp.ClientSession, base_url: str) -> None:
"""Perform ``POST /api/login`` and store the returned token pair.
Concurrent calls collapse onto a single login round-trip via the
instance lock; the second caller observes the tokens already set
and returns without making a network request.
Raises:
AuthError: When the login response is not ``successful: true``
or lacks tokens.
"""
async with self._lock():
if self._tokens is not None:
return
await self._login_locked(session, base_url)
[docs]
async def request_kwargs(self, session: aiohttp.ClientSession, base_url: str) -> dict[str, Any]:
"""Return ``Authorization: Bearer <accessToken>`` headers.
Refreshes the token proactively if it expires within
:attr:`PROACTIVE_REFRESH_LEEWAY` seconds, avoiding the cost of an
in-flight 401 + refresh + retry round. Concurrent callers that
observe an expiring token both queue on the auth lock; the
winner refreshes once, the runners-up re-check and skip.
"""
if self._tokens is None:
raise AuthError("PortalAuth.request_kwargs called before authenticate()")
if self._tokens.access_expires_within(self.PROACTIVE_REFRESH_LEEWAY):
await self._refresh_or_relogin(session, base_url)
# Re-read after the lock — another coroutine may have refreshed.
if self._tokens is None: # pragma: no cover — defensive; refresh sets tokens or raises
raise AuthError("tokens disappeared during refresh")
return {"headers": {"Authorization": f"Bearer {self._tokens.access_token}"}}
[docs]
async def handle_unauthorized(self, session: aiohttp.ClientSession, base_url: str) -> bool:
"""Handle 401: try refresh, then re-login.
Concurrent 401s from in-flight requests all enter
``_refresh_or_relogin``; the first runs the refresh, subsequent
callers re-check the cached token (which has just been updated)
and skip the network round-trip. Returns True if re-auth
succeeded and the caller should retry the original request;
False if both refresh and login failed.
"""
token_at_call = self._tokens.access_token if self._tokens else None
try:
await self._refresh_or_relogin(session, base_url, observed_token=token_at_call)
except AuthError:
return False
return True
[docs]
async def close(self, session: aiohttp.ClientSession, base_url: str) -> None:
"""Best-effort logout against ``POST /api/jwt/logout``, then clear
the in-memory tokens.
If we don't tell the eisy we're done, the refresh token stays
live for its full 30-day TTL — useful only to attackers who
somehow obtain it. The logout call is best-effort: any error
(network down, controller already gone, stale token) is logged
at debug level and swallowed. The local token state is cleared
regardless so the consumer can construct a fresh
:class:`PortalAuth` and re-authenticate.
"""
if self._tokens is not None:
await self._logout(session, base_url)
self._tokens = None
async def _logout(self, session: aiohttp.ClientSession, base_url: str) -> None:
"""Post ``POST /api/jwt/logout`` with a *live* bearer token.
The access token is the credential the eisy authenticates the
logout call with. After a long WebSocket-only session it's
usually expired — proactive refresh only fires on REST calls
(:meth:`request_kwargs`), and an event-stream-driven consumer
makes almost none after init. Posting an expired bearer to
``/api/jwt/logout`` is what produces the intermittent ``HTTP
404`` / ``HTTP 500`` responses seen at shutdown (the firmware's
answer for "no such session" varies). So refresh first if the
token is at/near expiry; the refresh rotates the refresh token
too, so logging out with the new access token still invalidates
the current pair. If the refresh itself fails the refresh token
is already dead or the controller is unreachable — there's
nothing left to invalidate, so skip the logout call.
"""
if self._tokens is not None and self._tokens.access_expires_within(self.PROACTIVE_REFRESH_LEEWAY):
async with self._lock():
if self._tokens is not None and self._tokens.access_expires_within(
self.PROACTIVE_REFRESH_LEEWAY
):
try:
await self._refresh_locked(session, base_url)
except AuthError as exc:
_LOGGER.debug("pre-logout token refresh failed (%s); skipping logout", exc)
return
if self._tokens is None: # pragma: no cover — defensive; guarded by close()
return
url = f"{base_url.rstrip('/')}{self.LOGOUT_PATH}"
headers = {"Authorization": f"Bearer {self._tokens.access_token}"}
try:
async with session.post(url, headers=headers) as resp:
_LOGGER.debug("logout response: HTTP %s", resp.status)
except Exception as exc: # pylint: disable=broad-except
# Network errors during logout are not fatal — the token
# will expire naturally even if we couldn't tell the eisy.
_LOGGER.debug("logout call failed (%s); token will expire naturally", exc)
async def _refresh_or_relogin(
self,
session: aiohttp.ClientSession,
base_url: str,
*,
observed_token: str | None = None,
) -> None:
"""Refresh the access token (or re-login on failure), serialised.
``observed_token`` is the access token the caller saw when it
decided a refresh was needed. After acquiring the lock we
compare against the current token; if it has changed, another
coroutine already refreshed on our behalf and we no-op. This
collapses concurrent 401s and concurrent
proactive-refresh-window misses onto a single round-trip.
"""
async with self._lock():
current = self._tokens.access_token if self._tokens else None
if observed_token is not None and current != observed_token:
# Another waiter completed the refresh while we were queued.
return
if (
observed_token is None
and self._tokens is not None
and not self._tokens.access_expires_within(self.PROACTIVE_REFRESH_LEEWAY)
):
# Proactive path: while we waited for the lock, the
# winner refreshed and the new token has comfortable
# life left. Skip.
return
refreshed = False
if self._tokens is not None:
try:
await self._refresh_locked(session, base_url)
except AuthError:
# Refresh failed — fall through to a full login.
self._tokens = None
else:
refreshed = True
if not refreshed:
await self._login_locked(session, base_url)
async def _login_locked(self, session: aiohttp.ClientSession, base_url: str) -> None:
"""``POST /api/login`` — caller must hold ``_auth_lock``."""
body = {"username": self._email, "password": self._password}
url = f"{base_url.rstrip('/')}{self.LOGIN_PATH}"
async with session.post(url, json=body) as resp:
if resp.status != 200:
raise AuthError(f"login failed: HTTP {resp.status}")
data = await resp.json()
self._tokens = self._tokens_from_response(data, "login")
async def _refresh_locked(self, session: aiohttp.ClientSession, base_url: str) -> None:
"""``POST /api/jwt/refresh`` — caller must hold ``_auth_lock``."""
if self._tokens is None:
raise AuthError("cannot refresh without an existing refresh token")
body = {"refreshToken": self._tokens.refresh_token}
url = f"{base_url.rstrip('/')}{self.REFRESH_PATH}"
async with session.post(url, json=body) as resp:
if resp.status != 200:
raise AuthError(f"token refresh failed: HTTP {resp.status}")
data = await resp.json()
self._tokens = self._tokens_from_response(data, "refresh")
@staticmethod
def _tokens_from_response(payload: dict[str, Any], op: str) -> TokenPair:
"""Validate and unwrap the ``data`` envelope from login/refresh responses."""
if not payload.get("successful"):
raise AuthError(f"{op} response was not successful")
data = payload.get("data")
if not isinstance(data, dict) or "accessToken" not in data or "refreshToken" not in data:
raise AuthError(f"{op} response missing accessToken/refreshToken")
return TokenPair.from_response(data)