diff --git a/Docs/CLI.md b/Docs/CLI.md index 42f91fd8..b547cdbf 100644 --- a/Docs/CLI.md +++ b/Docs/CLI.md @@ -41,10 +41,20 @@ Options: --tx-chunk-delay Delay between TX fragments (default: 0.005) --max-retries Max retry attempts for file transfer (default: 10) --direct Force direct serial (skip proxy detection) - --server-url WebServer URL (default: http://127.0.0.1:5500) - --token Auth token for remote servers (or set FPB_TOKEN env) + -s, --server Pick a server by discovery handle, hostname, + or full URL. Examples: + -s bench:5501 + -s bench (when unique on LAN) + -s http://1.2.3.4:5500 + If omitted: FPB_SERVER env, then mDNS + auto-discovery, then http://127.0.0.1:5500. + --no-discovery Disable mDNS auto-discovery + --token Auth token (or set FPB_TOKEN env). Required + when the server returns 401/403. ``` +`--server-url` and `FPB_SERVER_URL` still work for backwards compatibility but are deprecated; use `-s` / `FPB_SERVER` instead. + ### About `--port` The serial port belongs to the **WebServer**, not the CLI. When a server is already running and has a device connected, `--port` is not needed — the CLI attaches to the server's existing connection. This applies to **both local and remote** servers: as long as `/api/status` reports `connected=true`, you can run device commands without `--port`. @@ -57,39 +67,115 @@ Without `--port` the CLI never opens a serial port directly — it either attach ## Operating Modes -| Mode | Trigger | Behavior | -|------|---------|----------| -| Offline (local) | No `--port`, no local server (or server has no device) | ELF analysis / compile only | -| Local proxy (port-less) | No `--port`, local server already has a device | Attach to server's existing connection | -| Local proxy | `--port` + local server running | Attach to server, forward device ops | -| Local auto-launch | `--port` + no local server | Auto-launch server, then proxy | -| Local direct | `--direct --port` | Open serial directly (bypass server) | -| Remote proxy | `--server-url http://remote:port` | Pure proxy to remote server, no auto-launch (attaches even if no device yet) | +The CLI runs in exactly one of four mutually-exclusive modes. The mode is decided once, before any command is dispatched, by the connection resolver. You don't pick a mode by name — you pick it by the inputs the resolver reads. -## Remote Control +| Mode | When | Auth | +|------|------|------| +| **Offline** | The subcommand is ELF-only (`analyze`, `disasm`, `decompile`, `signature`, `search`, `get-symbols`, `compile`) or admin-only (`discover`, `server-stop`, `disconnect`) | Never | +| **Local Proxy** | Server is on this host (loopback or a local interface IP) | None for localhost; LAN-bound server admins should still set a token | +| **Remote Proxy** | Server is on another host | `--token` / `FPB_TOKEN` required when the server returns 401/403 | +| **Direct Serial** | `--direct` is set explicitly | None; bypasses the WebServer entirely | + +### How the mode is chosen + +The resolver runs through this list and stops at the first match: + +1. **Offline / admin subcommand** → Offline. +2. **`--direct`** → Direct Serial. Requires `--port`. Rejected with `-s` / `--server-url`. +3. **`-s / --server `** → resolve the handle, then Local or Remote Proxy. +4. **`FPB_SERVER`** env var → same handle resolution as `-s`. +5. *(deprecated)* `--server-url ` → URL only. +6. *(deprecated)* `FPB_SERVER_URL` env → URL only. +7. **A single CLI-launched server** found via PID file → Local Proxy on `127.0.0.1:`. +8. **`http://127.0.0.1:5500/api/status` reachable** → Local Proxy on the default port. +9. **`--no-discovery`** → Local Proxy on `http://127.0.0.1:5500` (no probe of LAN). +10. **mDNS browse for ~3 s** on `_fpbinject._tcp.local.`: + - 0 results → Local Proxy on `http://127.0.0.1:5500` (fallback). + - 1 result → Local or Remote Proxy (already loopback-normalized when same-host). + - 2+ results → list candidates on stderr, exit `2`. Re-run with `-s host:port`. + +`--port` only ever names the **device serial port**, never the server port. To talk to a server on a non-default TCP port, use `-s 127.0.0.1:5501`. + +### Handle forms accepted by `-s` / `FPB_SERVER` + +| Form | Example | Behaviour | +|------|---------|-----------| +| URL | `http://1.2.3.4:5500` | Used verbatim. | +| `host:port` | `bench:5501` | Looked up via mDNS; must match exactly one server. | +| `host` | `bench` | Looked up via mDNS; must match exactly one server (else exit `2` with hints). | -To operate a device attached to another machine: +### Exit codes + +| Code | Meaning | +|------|---------| +| `0` | Success | +| `1` | Runtime failure (connect / auth / IO / invalid flag combination / unresolvable handle) | +| `2` | Multiple servers matched a handle or were discovered with no `-s` — disambiguate | + +### Invalid flag combinations + +| Combo | Reason | Behaviour | +|-------|--------|-----------| +| `--direct -s …` (or `--direct --server-url …`) | Direct mode bypasses the WebServer | Rejected with one-line error, exit `1` | +| `--direct` without `--port` for a device command | Direct mode opens a serial port — there is nothing to do without one | Rejected with one-line error, exit `1` | + +## Remote Control ```bash # On the machine with the device (B): start WebServer ./main.py --host 0.0.0.0 --port 5500 # 🔑 Token: dd88d5df -# On the controlling machine (A): +# On the controlling machine (A) — pick the server by hostname or handle: export FPB_TOKEN=dd88d5df -fpb_cli.py --server-url http://192.168.1.20:5500 info -fpb_cli.py --server-url http://192.168.1.20:5500 mem-read 0x20000000 64 -fpb_cli.py --server-url http://192.168.1.20:5500 serial-send "ps" +export FPB_SERVER=B-host:5500 # use this server for the whole shell +fpb_cli.py info +fpb_cli.py mem-read 0x20000000 64 +fpb_cli.py serial-send "ps" + +# Or per-command: +fpb_cli.py -s B-host:5500 info # If the remote server has no device connected yet: -fpb_cli.py --server-url http://192.168.1.20:5500 --port /dev/ttyACM0 connect +fpb_cli.py -s B-host:5500 --port /dev/ttyACM0 connect + +# URL still works when DNS-style names aren't available: +fpb_cli.py -s http://192.168.1.20:5500 info ``` Notes: -- `--token` is required for non-localhost servers. Use `FPB_TOKEN` env to avoid shell history exposure. + +- `--token` is required when the remote server returns 401/403. Use `FPB_TOKEN` env to keep it out of shell history. - `--elf` / `--compile-commands` paths in inject commands refer to **server-side** paths. - ELF analysis commands (analyze/disasm/search) always operate on the **local** ELF file. +## Auto-Discovery (mDNS) + +The discovery step browses `_fpbinject._tcp.local.`. Three behaviours worth knowing: + +- **Same-host normalization.** A server advertising both `127.0.0.1` and a LAN IP (e.g. on a multi-homed machine) is always classified as Local Proxy and the URL is rewritten to `127.0.0.1:`. You will never be prompted for a token to talk to a server you started yourself. +- **Handle resolution.** `-s bench:5501` and `-s bench` both browse mDNS to find the matching server, so you don't have to copy URLs by hand. `-s` of a URL skips discovery. +- **Token never travels over mDNS.** TXT records carry `txtvers`, `version`, `auth` (advertised intent), `device`, `path`, `id` (stable per-installation UUID). Tokens come from `--token`, `FPB_TOKEN`, or the server's startup banner. + +### `discover` — list visible servers + +```bash +fpb_cli.py discover [--timeout 3.0] [--json] +``` + +Default output is a human-friendly table: + +```text +HANDLE URL AUTH DEVICE VERSION +bench:5500 http://127.0.0.1:5500 token none 1.6.6 +bench:5501 http://127.0.0.1:5501 none none 1.6.6 +bench:5500 http://192.168.1.20:5500 token sensor 1.6.6 +``` + +`--json` returns a machine-readable list (used to be the default). + +For the full protocol contract see [Tools/WebServer/Docs/Discovery.md](../Tools/WebServer/Docs/Discovery.md). + ## Commands ### Offline Commands (No Device Required) diff --git a/Tools/WebServer/.gitignore b/Tools/WebServer/.gitignore index 0d82c6f5..de6c531c 100644 --- a/Tools/WebServer/.gitignore +++ b/Tools/WebServer/.gitignore @@ -10,3 +10,7 @@ package-lock.json coverage/ .nyc_output/ tests/coverage/ + + +# Per-machine stable server id minted by the mDNS advertiser. +.fpbinject_server_id diff --git a/Tools/WebServer/Docs/Discovery.md b/Tools/WebServer/Docs/Discovery.md new file mode 100644 index 00000000..e18a38ab --- /dev/null +++ b/Tools/WebServer/Docs/Discovery.md @@ -0,0 +1,142 @@ +# FPBInject mDNS Discovery Protocol + +This document specifies the mDNS / DNS-SD service that FPBInject WebServer instances publish on the local network and that `fpb_cli.py` consumes for auto-discovery. + +## Goals + +- Let CLI clients find a running WebServer on the LAN without prior knowledge of host or port. +- Advertise enough metadata for the client to pick the right server when more than one is reachable. +- Stay out of the authentication path: discovery is **endpoint discovery, not authentication**. Tokens are exchanged out-of-band. + +## Service registration + +| Field | Value | +|---|---| +| Service type | `_fpbinject._tcp.local.` | +| Instance name | `FPBInject on :` (RFC 6763 §4.1.1 user-visible name; the port suffix lets multiple servers on one host coexist and is the source of the client-side ``handle`` value). | +| Port | The TCP port the WebServer is listening on (default `5500`, follows `--port`). | +| Address | All interfaces zeroconf binds to (IPv4 only by default). | +| Server hostname | `.local.` | + +`_fpbinject` is 9 characters, fits the RFC 6335 §5.1 ≤15-char limit, and is not registered on IANA — no collision with shipped service types. + +## TXT records (v1) + +Keys are case-insensitive. Values are printable UTF-8. + +| Key | Value | Meaning | +|---|---|---| +| `txtvers` | `1` | Schema version of this TXT record (RFC 6763 §6.7). Bump only on incompatible changes. | +| `version` | e.g. `1.6.6` | WebServer application version. Clients MAY warn on major mismatch. | +| `auth` | `token` \| `none` | Advertised auth intent. `token` means a token is required for non-localhost access; `none` means the server was started with `--no-auth`. | +| `device` | `none` (v1) | Whether the server has a serial device attached. **v1 limitation: this value is set once at startup as `none` and is not updated at runtime.** Real-time updates may be added in a later schema version (will bump `txtvers`). | +| `path` | `/api` | API base path. Reserved for future protocol revs. | +| `id` | `fpb:` | Stable per-installation UUID; persisted on the server in `Tools/WebServer/.fpbinject_server_id`. Survives port and hostname changes; deleting the file mints a new identity. Reserved for future client-side identity matching. | + +### Security: token must not appear in TXT + +The auth token MUST NOT be published in TXT records. mDNS announcements are broadcast in cleartext on UDP/5353 and cached for tens of minutes by other hosts on the network. Publishing the token (or a recoverable hash of it) defeats its purpose. + +The CLI obtains the token from the server's startup banner (`🔑 Token: …`), the `FPB_TOKEN` env var, or the `--token` flag — never from mDNS. + +## Lifecycle + +| Event | Server behavior | +|---|---| +| Server start (no `--no-mdns`) | Construct `Zeroconf()`, register `ServiceInfo` for `_fpbinject._tcp.local.` with the TXT keys above. | +| Server graceful exit (atexit, SIGINT, SIGTERM) | `unregister_service()` sends a "goodbye" packet, then `close()`. | +| Server `kill -9` or hard crash | No goodbye packet. The stale entry is evicted by the mDNS TTL (default ~75 minutes per RFC 6762 §10). | +| `--no-mdns` flag | Server skips registration entirely. | + +## Client discovery + +`Tools/WebServer/cli/discover.py::discover_sync(timeout: float = 3.0)` returns `list[FPBServer]`. + +Each `FPBServer` is a dataclass: + +``` +FPBServer( + name: str, # full mDNS instance name + host: str, # IPv4 or hostname (loopback when same-host) + port: int, + version: str, # from TXT + auth: str, # "token" | "none" + device: str, # "none" (v1) + path: str, # "/api" + url: str, # convenience: f"http://{host}:{port}" + id: str, # from TXT (empty for legacy servers) + handle: str, # ":" — the human-friendly id `-s` accepts +) +``` + +### CLI precedence ladder + +`fpb_cli.py::resolve_connection_plan(args)` runs through this list and stops at the first match. Each step produces a final `ConnectionPlan` (mode + URL + token + serial port + flags); the connector consumes the plan once. + +1. **Subcommand is offline or admin-only** (`analyze`, `disasm`, `decompile`, `signature`, `search`, `get-symbols`, `compile`, `discover`, `server-stop`, `disconnect`) — return Offline plan, skip everything below. Zero discovery delay. +2. **`--direct`** — return Direct plan. Requires `--port`. Rejected with `-s` / `--server-url`. +3. **`-s / --server `** — handle resolution: + - URL (contains `://`) → used verbatim. + - `host:port` → mDNS browse, exact `handle` match. + - `host` → mDNS browse, must match exactly one server (else exit `2` with hints). +4. **`FPB_SERVER`** env var — same handle resolution as `-s`. +5. *(deprecated)* **`--server-url `** — URL only. Warns under `-v` and is removed in a future release. +6. *(deprecated)* **`FPB_SERVER_URL`** env — URL only. +7. **Single CLI-launched server** (PID file in `Tools/WebServer/.cli_server_*.pid`) — Local Proxy on `127.0.0.1:`. No mDNS. +8. **`http://127.0.0.1:5500/api/status` reachable** — Local Proxy on the default port. No mDNS. +9. **`--no-discovery`** — Local Proxy on `http://127.0.0.1:5500` (fallback only, no LAN browse). +10. **mDNS browse** for 3.0 s on `_fpbinject._tcp.local.`: + - 0 results → Local Proxy on `http://127.0.0.1:5500` (fallback). + - 1 result → classify the address. Same-host hits (loopback or local interface IP) are normalized to `127.0.0.1:` so the user is never asked for a token to talk to a server they themselves started. + - ≥ 2 results → list candidates on stderr, `sys.exit(2)`. + +### Localhost preference + +Within step 10, when a single mDNS service announces multiple addresses (very common on multi-homed hosts), the resolver sorts them with this key: + +| Class | Key | +|-------|-----| +| Loopback (`127.0.0.0/8`) | `(0, addr)` | +| Local interface IP (matches `socket.getaddrinfo(gethostname())`) | `(1, addr)` | +| Anything else | `(2, addr)` | + +Lowest tuple wins. If the winner is loopback or a local-interface IP, the host field of the resulting `FPBServer` is rewritten to `127.0.0.1` and the URL becomes `http://127.0.0.1:`. This eliminates the LAN-IP-from-localhost trap that previously caused spurious 403s. + +### Handle cache (stale-while-revalidate) + +`-s host:port` and `FPB_SERVER=host:port` consult a per-user cache before doing any mDNS work. The cache is a JSON file at `${XDG_CACHE_HOME:-$HOME/.cache}/fpbinject/handles.json` mapping each handle to its last-known URL plus the server's TXT `id`. + +Behaviour: + +| Outcome | What happens | Time | +|---------|--------------|------| +| Hit, fresh (≤ 24 h) | Return the cached URL **immediately**. A daemon thread re-runs the mDNS lookup and updates the entry for next time. The user does not block on the refresh. | ~100 ms | +| Hit, but the cached URL refuses connection | The connector raises `FPBCLIError` and invalidates the cache entry; the next call falls back to a synchronous mDNS lookup. | (this call fails fast; next call ~1.3 s) | +| Miss / expired / `FPB_NO_CACHE=1` | Synchronous mDNS browse, then write the cache. | ~1.3 s | + +The `host` (no port) form is **never** cached because it is allowed to match multiple servers and would race with the LAN topology. + +`FPB_NO_CACHE=1` disables both reads and writes. `rm ~/.cache/fpbinject/handles.json` wipes the cache. Both are safe at any time. + +Atomic writes use `tempfile + os.replace`, so concurrent CLI invocations cannot produce a half-written file. Last writer wins. + +### Exit codes + +| Code | Meaning | +|---|---| +| `0` | Success. | +| `1` | Runtime failure (connect/auth/IO/invalid flag combination). | +| `2` | Multiple servers discovered without `--server-url`; user must disambiguate. | + +## v1 limitations + +- `device` TXT is published once at startup as `none`. Real-time `connected`/`none` transitions are deferred to a later TXT schema version. +- `auth` TXT carries advertised intent (server's `--no-auth` flag), not effective state. A misconfigured server with `auth=token` but no token configured is the server operator's problem, not the client's. +- Ungraceful exits (`kill -9`, crash) leave a stale advertisement until mDNS TTL eviction. SIGINT and SIGTERM are handled and trigger an immediate goodbye packet. + +## References + +- RFC 6762 — Multicast DNS +- RFC 6763 — DNS-Based Service Discovery (§4.1.1 instance names, §6.7 `txtvers`) +- RFC 6335 §5.1 — service-name format +- python-zeroconf (https://github.com/python-zeroconf/python-zeroconf) diff --git a/Tools/WebServer/cli/connection_plan.py b/Tools/WebServer/cli/connection_plan.py new file mode 100644 index 00000000..f2af63d1 --- /dev/null +++ b/Tools/WebServer/cli/connection_plan.py @@ -0,0 +1,67 @@ +"""Connection-plan data model for the CLI. + +Single source of truth for what mode the CLI is operating in and what server +URL / token / serial port it should use. The resolver builds a +``ConnectionPlan`` once; the connector consumes it once. There is no other +decision-making code path between the two. + +See ``Tools/WebServer/Docs/Discovery.md`` for the precedence rules and +``Docs/CLI.md`` for the user-facing description. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class CommandPolicy(Enum): + """How a subcommand interacts with the server / device. + + Subparsers attach exactly one of these via ``set_defaults`` so the + resolver and the dispatcher don't keep parallel hard-coded sets. + """ + + OFFLINE = "offline" # ELF analysis only; never connects + DEVICE = "device" # needs a connected device to succeed + SERVER_ADMIN = "server_admin" # talks to a specific server only (server-stop) + + +class ConnectionMode(Enum): + """Runtime mode after resolution.""" + + OFFLINE = "offline" + LOCAL_PROXY = "local_proxy" + REMOTE_PROXY = "remote_proxy" + DIRECT = "direct" + + +@dataclass(frozen=True) +class ConnectionPlan: + """Output of ``resolve_connection_plan(args)``. + + Every field is final. The connector reads but never mutates it. + + ``allow_launch``: only set for LOCAL_PROXY when the server URL is the + default localhost endpoint. Auto-launch never crosses hosts. + + ``allow_direct_fallback``: only set for LOCAL_PROXY plans that already + carry a ``serial_port`` -- preserves the legacy "auto-launch fails -> + direct serial" behavior, scoped to that one path. + + ``source``: short string describing which resolver branch produced the + plan, surfaced when ``--verbose``. Examples: ``"flag"``, ``"env"``, + ``"localhost-default"``, ``"pid"``, ``"mdns"``, ``"direct"``, + ``"offline-command"``. + """ + + mode: ConnectionMode + server_url: Optional[str] = None + token: Optional[str] = None + serial_port: Optional[str] = None + baudrate: int = 115200 + allow_launch: bool = False + allow_direct_fallback: bool = False + source: str = "" + cache_handle: Optional[str] = None diff --git a/Tools/WebServer/cli/discover.py b/Tools/WebServer/cli/discover.py new file mode 100644 index 00000000..81c87164 --- /dev/null +++ b/Tools/WebServer/cli/discover.py @@ -0,0 +1,295 @@ +"""mDNS discovery for FPBInject WebServer. + +Browses ``_fpbinject._tcp.local.`` and returns the list of reachable servers. +Used by ``fpb_cli.py`` so device commands can find a server on the LAN +without prior knowledge of host or port. + +The protocol (service type, TXT records) is documented in +``Tools/WebServer/Docs/Discovery.md``. +""" + +from __future__ import annotations + +import asyncio +import ipaddress +import logging +import socket +from dataclasses import dataclass +from typing import Optional + +from zeroconf import IPVersion, ServiceStateChange +from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf + +logger = logging.getLogger(__name__) + +SERVICE_TYPE = "_fpbinject._tcp.local." + +DEFAULT_TIMEOUT_S = 3.0 +RESOLVE_TIMEOUT_MS = 2000 + + +def _local_interface_ips() -> frozenset[str]: + """IPv4 addresses bound on this host (loopback + every NIC). + + Used so a service advertising ``10.0.0.5:5500`` from THIS host gets + classified as local rather than remote, preventing the "why is the CLI + asking for a token to talk to a server I just started?" trap. + + Tries ``ifaddr`` (already a transitive dep of ``zeroconf``) so we get + every interface, then falls back to ``socket.getaddrinfo(hostname)`` + for environments where ``ifaddr`` is unavailable. + """ + ips: set[str] = {"127.0.0.1"} + + try: + import ifaddr + + for adapter in ifaddr.get_adapters(): + for ip in adapter.ips: + if isinstance(ip.ip, str): + ips.add(ip.ip) + except Exception: + pass + + try: + infos = socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET) + for info in infos: + ips.add(info[4][0]) + except OSError: + pass + + return frozenset(ips) + + +def _is_loopback(addr: str) -> bool: + try: + return ipaddress.ip_address(addr).is_loopback + except ValueError: + return False + + +def _address_sort_key(addr: str, local_ips: frozenset[str]) -> tuple[int, str]: + """Loopback (0) < local-interface (1) < other (2).""" + if _is_loopback(addr): + return (0, addr) + if addr in local_ips: + return (1, addr) + return (2, addr) + + +def _is_same_host(addr: str, local_ips: frozenset[str]) -> bool: + return _is_loopback(addr) or addr in local_ips + + +@dataclass(frozen=True) +class FPBServer: + """A single discovered FPBInject WebServer. + + ``url`` is convenience derived from ``host:port``; the auth token (if any) + is NOT carried by mDNS and must be supplied separately by the user. + + ``id`` is the stable per-installation UUID (TXT ``id``). Empty string when + talking to legacy servers that don't publish the field. + + ``handle`` is the human-friendly identifier the CLI accepts via ``-s``. + Today that is ``:`` derived from the mDNS instance name; + a future version may shorten to ```` when unique. + """ + + name: str + host: str + port: int + version: str + auth: str + device: str + path: str + url: str + id: str = "" + handle: str = "" + + +def _decode_txt_value(value) -> str: + if value is None: + return "" + if isinstance(value, bytes): + try: + return value.decode() + except UnicodeDecodeError: + return "" + return str(value) + + +async def _resolve(aiozc: AsyncZeroconf, name: str) -> Optional[FPBServer]: + info = AsyncServiceInfo(SERVICE_TYPE, name) + if not await info.async_request(aiozc.zeroconf, RESOLVE_TIMEOUT_MS): + return None + addresses = info.parsed_scoped_addresses() or [] + if not addresses: + return None + port = int(info.port or 0) + if port <= 0: + return None + + local_ips = _local_interface_ips() + sorted_addrs = sorted(addresses, key=lambda a: _address_sort_key(a, local_ips)) + raw_host = sorted_addrs[0] + host = "127.0.0.1" if _is_same_host(raw_host, local_ips) else raw_host + + props = info.properties or {} + decoded = { + (k.decode() if isinstance(k, bytes) else str(k)): _decode_txt_value(v) + for k, v in props.items() + } + return FPBServer( + name=name, + host=host, + port=port, + version=decoded.get("version", ""), + auth=decoded.get("auth", ""), + device=decoded.get("device", ""), + path=decoded.get("path", ""), + url=f"http://{host}:{port}", + id=decoded.get("id", ""), + handle=_handle_from_name(name, port), + ) + + +def _handle_from_name(service_name: str, port: int) -> str: + """Extract ``:`` from a ``FPBInject on :...`` instance name. + + Falls back to ``host:port`` only when parsing fails (truly weird names). + The handle is what the user types after ``-s``. + """ + prefix = "FPBInject on " + base = service_name.split(f".{SERVICE_TYPE}")[0] + if base.startswith(prefix): + candidate = base[len(prefix) :] + if candidate: + return candidate + return f"unknown:{port}" + + +def classify_handle(value: str) -> str: + """Decide what kind of value the user passed to ``-s/--server``. + + Returns one of ``"url"``, ``"host_port"``, ``"host"``. + The CLI uses this to route into URL-direct, mDNS handle-lookup, or + mDNS unique-host-lookup paths respectively. + """ + if "://" in value: + return "url" + if ":" in value: + host, _, port = value.rpartition(":") + if host and port.isdigit(): + return "host_port" + return "host" + + +def find_by_handle(servers: list, value: str) -> list: + """Filter a discovery result list by user-supplied ``-s`` handle. + + Matching: + * ``host:port``: exact handle match. + * ``host`` : every server whose handle starts with ``host:`` AND + every server whose host attribute equals ``host``. + + Returns 0, 1, or N matches; the caller decides what to do with N>1. + """ + kind = classify_handle(value) + if kind == "host_port": + return [s for s in servers if s.handle == value] + return [s for s in servers if s.handle.split(":", 1)[0] == value or s.host == value] + + +async def discover( + timeout: float = DEFAULT_TIMEOUT_S, + *, + early_match: Optional[callable] = None, +) -> list[FPBServer]: + """Browse the LAN for FPBInject servers. + + Returns the list of resolved servers (deduplicated by service name) seen + within ``timeout`` seconds. Always closes its Zeroconf instance. + + ``early_match``: optional ``callable(FPBServer) -> bool``. When it returns + True for a freshly-resolved server, the browse stops immediately and + returns. Used by ``discover_one_by_handle`` to short-circuit the typical + case where the user already knows the handle they want. + """ + found: dict[str, FPBServer] = {} + pending: list[asyncio.Task] = [] + done = asyncio.Event() + + async with AsyncZeroconf(ip_version=IPVersion.V4Only) as aiozc: + loop = asyncio.get_event_loop() + + async def _collect_and_check(name: str) -> None: + try: + server = await _resolve(aiozc, name) + except Exception as exc: + logger.debug("resolve(%s) failed: %s", name, exc) + return + if server is None or name in found: + return + found[name] = server + if early_match is not None and early_match(server): + done.set() + + def _on_state_change(zeroconf, service_type, name, state_change): + if state_change is not ServiceStateChange.Added: + return + pending.append(loop.create_task(_collect_and_check(name))) + + browser = AsyncServiceBrowser( + aiozc.zeroconf, [SERVICE_TYPE], handlers=[_on_state_change] + ) + try: + try: + await asyncio.wait_for(done.wait(), timeout=timeout) + except asyncio.TimeoutError: + pass + finally: + try: + await browser.async_cancel() + except Exception as exc: + logger.debug("AsyncServiceBrowser cancel failed: %s", exc) + + if pending: + await asyncio.gather(*pending, return_exceptions=True) + + return sorted(found.values(), key=lambda s: (s.host, s.port)) + + +def discover_sync(timeout: float = DEFAULT_TIMEOUT_S) -> list[FPBServer]: + """Blocking wrapper around :func:`discover`. + + Convenient for synchronous callers (the CLI dispatcher); runs its own + event loop via ``asyncio.run``. + """ + return asyncio.run(discover(timeout)) + + +def discover_sync_by_handle( + value: str, + timeout: float = DEFAULT_TIMEOUT_S, +) -> list[FPBServer]: + """Find FPBInject servers matching a user-supplied -s handle. + + Short-circuits as soon as enough servers match: + * ``host:port`` form -> returns the moment the exact match is resolved + (typically <100 ms on a warm mDNS cache, vs the full ``timeout`` + budget when blindly listing). + * ``host`` form (or URL form) -> falls back to a normal full browse + because we cannot tell from the value alone whether more matches + might arrive. + + Returns 0, 1, or N FPBServer records; the caller decides what to do. + """ + kind = classify_handle(value) + if kind != "host_port": + return discover_sync(timeout) + + def _is_target(s: FPBServer) -> bool: + return s.handle == value + + return asyncio.run(discover(timeout, early_match=_is_target)) diff --git a/Tools/WebServer/cli/fpb_cli.py b/Tools/WebServer/cli/fpb_cli.py index ecc2f302..40a17cdc 100755 --- a/Tools/WebServer/cli/fpb_cli.py +++ b/Tools/WebServer/cli/fpb_cli.py @@ -35,6 +35,24 @@ ProxyAuthError, DEFAULT_SERVER_URL, DEFAULT_PORT, + list_cli_servers, +) + +try: # Optional: discovery requires the zeroconf package. + from cli.discover import ( # noqa: E402 + discover_sync, + discover_sync_by_handle, + FPBServer, + ) +except Exception: # pragma: no cover + discover_sync = None + discover_sync_by_handle = None + FPBServer = None + +from cli.connection_plan import ( # noqa: E402 + CommandPolicy, + ConnectionMode, + ConnectionPlan, ) try: @@ -46,9 +64,20 @@ class FPBCLIError(Exception): - """CLI specific errors""" + """CLI-specific error. ``exit_code`` defaults to 1. + + Raise the ``AmbiguousServerError`` subclass when more than one server + matches a discovery handle so main() can exit ``2`` (the documented + ladder code for "needs disambiguation"). + """ + + exit_code = 1 - pass + +class AmbiguousServerError(FPBCLIError): + """Multi-match on discovery handle / mDNS browse; exits ``2``.""" + + exit_code = 2 class DeviceState(DeviceStateBase): @@ -109,12 +138,12 @@ def __init__( tx_chunk_delay: float = 0.002, max_retries: int = 10, direct: bool = False, - server_url: str = DEFAULT_SERVER_URL, + server_url: Optional[str] = None, token: Optional[str] = None, + plan: Optional[ConnectionPlan] = None, ): self.verbose = verbose self.setup_logging() - # Create device state (used for offline ELF operations & direct mode) self._device_state = DeviceState() self._device_state.elf_path = elf_path self._device_state.compile_commands_path = compile_commands @@ -123,116 +152,169 @@ def __init__( self._device_state.transfer_max_retries = max_retries self._fpb = FPBInject(self._device_state) - # Proxy and lock state self._proxy = None self._port_lock = None - self._pending_local_server_url = None - self._pending_local_token = None - self._pending_local_baudrate = baudrate - remote = self._is_remote_url(server_url) + if plan is None: + plan = self._legacy_kwargs_to_plan( + direct=direct, + server_url=server_url, + token=token, + port=port, + baudrate=baudrate, + ) + self._connect_from_plan(plan) - # Direct mode: bypass proxy, open a local serial port. + @staticmethod + def _legacy_kwargs_to_plan( + *, + direct: bool, + server_url: Optional[str], + token: Optional[str], + port: Optional[str], + baudrate: int, + ) -> ConnectionPlan: + """Translate the historical __init__ kwargs into a ConnectionPlan. + + Preserves the old behavior used by 65+ tests: + - direct=True opens the serial port directly. + - server_url=None or localhost + port: local proxy with auto-launch + and direct-serial fallback enabled. + - server_url=None and no port: pure offline (no probe). + - non-local server_url: remote proxy, no auto-launch, no fallback. + """ if direct: - if remote: + if server_url and not _is_local_url(server_url): raise FPBCLIError( "--direct cannot be combined with a remote --server-url. " "Direct mode opens a local serial port; remote control must " "go through the WebServer proxy." ) - if port: - self._direct_connect(port, baudrate) - return + return ConnectionPlan( + mode=ConnectionMode.DIRECT, + serial_port=port, + baudrate=baudrate, + source="legacy-direct", + ) + + if server_url is None: + if port is None: + return ConnectionPlan( + mode=ConnectionMode.OFFLINE, source="legacy-offline" + ) + url = DEFAULT_SERVER_URL + return ConnectionPlan( + mode=ConnectionMode.LOCAL_PROXY, + server_url=url, + token=token, + serial_port=port, + baudrate=baudrate, + allow_launch=True, + allow_direct_fallback=True, + source="legacy-local-default", + ) + + if _is_local_url(server_url): + return ConnectionPlan( + mode=ConnectionMode.LOCAL_PROXY, + server_url=server_url, + token=token, + serial_port=port, + baudrate=baudrate, + allow_launch=bool(port), + allow_direct_fallback=bool(port), + source="legacy-local-explicit", + ) + + return ConnectionPlan( + mode=ConnectionMode.REMOTE_PROXY, + server_url=server_url, + token=token, + serial_port=port, + baudrate=baudrate, + allow_launch=False, + allow_direct_fallback=False, + source="legacy-remote", + ) - # Remote mode: always proxy. The device lives on the remote host, so - # --port is OPTIONAL — when the remote server already has a device - # connected the client needs no port. Never auto-launch or fall back - # to a local serial connection. - if remote: - self._init_remote_proxy(server_url, port, baudrate, token) + def _connect_from_plan(self, plan: ConnectionPlan) -> None: + """Single connection executor — consumes a ConnectionPlan. + + If the plan came from the handle cache and the connect raises, + the cache entry is invalidated so the next invocation re-runs + mDNS rather than re-trying the dead URL. + """ + if plan.mode is ConnectionMode.OFFLINE: return - # Local mode. Without --port we stay offline by default so ELF - # analysis / compile commands run fast and self-contained. Callers - # that need a device without --port can call - # try_attach_local_server() to opportunistically attach to a - # locally-running server that already owns a connected device. - if not port: - self._pending_local_server_url = server_url - self._pending_local_token = token - self._pending_local_baudrate = baudrate + if plan.mode is ConnectionMode.DIRECT: + if plan.serial_port: + self._direct_connect(plan.serial_port, plan.baudrate) return - proxy = ServerProxy(base_url=server_url, token=token) + try: + if plan.mode is ConnectionMode.REMOTE_PROXY: + self._connect_remote(plan) + else: + self._connect_local(plan) + except FPBCLIError: + if plan.cache_handle: + invalidate_cached_handle(plan.cache_handle) + raise + + def _connect_local(self, plan: ConnectionPlan) -> None: + proxy = ServerProxy(base_url=plan.server_url, token=plan.token) - # 1) If a server is already running, attach to it. if proxy.is_server_running(): - self._attach_proxy(proxy, port, baudrate) + self._attach_proxy(proxy, plan.serial_port, plan.baudrate) if self.verbose: - logging.info(f"Using WebServer proxy mode ({server_url})") + logging.info(f"Using WebServer proxy mode ({plan.server_url})") return - # 2) Server not running — auto-launch it. - if self.verbose: - logging.info("WebServer not running, auto-launching...") - if proxy.launch_server(): - self._attach_proxy(proxy, port, baudrate) - if self.verbose: - logging.info(f"WebServer launched, proxy mode active ({server_url})") + if not plan.serial_port: return - # 3) Auto-launch failed — fall back to direct serial. - if self.verbose: - logging.warning("Auto-launch failed, falling back to direct mode") - self._direct_connect(port, baudrate) - - def try_attach_local_server(self) -> bool: - """Opportunistically attach to a local WebServer with a connected device. - - Called from main() before dispatching device-needing commands when the - user did not supply --port. Returns True if a proxy was attached or - was already attached; False if no usable local server is reachable. - Init keeps this lazy so unit tests instantiating FPBCLI() never trigger - a network probe. - """ - if self._proxy is not None: - return True - if not self._pending_local_server_url: - return False - - proxy = ServerProxy( - base_url=self._pending_local_server_url, - token=self._pending_local_token, - ) - # Single probe instead of is_server_running()+is_device_connected() - # so we avoid two /api/status round-trips and any inconsistency - # between them under races. - status = proxy.probe_status() - if not (status.get("success") and status.get("connected")): - return False - - self._attach_proxy(proxy, None, self._pending_local_baudrate) - if self.verbose and self._device_state.connected: - logging.info( - f"Using WebServer proxy mode " - f"({self._pending_local_server_url}, server-owned port)" - ) - return self._device_state.connected + if plan.allow_launch: + if self.verbose: + logging.info("WebServer not running, auto-launching...") + if proxy.launch_server(): + self._attach_proxy(proxy, plan.serial_port, plan.baudrate) + if self.verbose: + logging.info( + f"WebServer launched, proxy mode active ({plan.server_url})" + ) + return - @staticmethod - def _is_remote_url(server_url: str) -> bool: - """Return True if server_url points to a non-localhost host. + if plan.allow_direct_fallback: + if self.verbose: + logging.warning("Auto-launch failed, falling back to direct mode") + self._direct_connect(plan.serial_port, plan.baudrate) - Remote URLs require proxy-only behavior (no auto-launch, no direct - serial fallback) since the device is attached to the remote machine. - """ + def _connect_remote(self, plan: ConnectionPlan) -> None: + proxy = ServerProxy(base_url=plan.server_url, token=plan.token) try: - from urllib.parse import urlparse - - host = (urlparse(server_url).hostname or "").lower() + status = proxy.get_status() + except ProxyAuthError as e: + raise FPBCLIError(str(e)) except Exception: - return False - return host not in ("", "127.0.0.1", "localhost", "::1") + if plan.serial_port: + raise FPBCLIError( + f"Remote WebServer not reachable: {plan.server_url}. " + "Check the URL/port and that the server is running." + ) + return + + if not status.get("success", False): + if plan.serial_port: + raise FPBCLIError( + f"Remote WebServer not reachable: {plan.server_url}. " + "Check the URL/port and that the server is running." + ) + return + + self._attach_proxy(proxy, plan.serial_port, plan.baudrate) + if self.verbose: + logging.info(f"Using remote WebServer proxy ({plan.server_url})") def _attach_proxy( self, proxy: ServerProxy, port: Optional[str], baudrate: int @@ -253,56 +335,6 @@ def _attach_proxy( except ProxyAuthError as e: raise FPBCLIError(str(e)) - def _init_remote_proxy( - self, - server_url: str, - port: Optional[str], - baudrate: int, - token: Optional[str], - ) -> None: - """Set up proxy for a remote WebServer (no auto-launch / direct fallback). - - ``--port`` is optional: the device lives on the remote host, so when - the remote server already has a device connected the client needs no - port. A port is only forwarded so the server can open it when no - device is connected yet. - - Reachability handling: - - Unreachable + port given -> error (the user clearly wanted the device). - - Unreachable + no port -> stay offline (allow local ELF analysis). - - Auth failure (403/401) -> always error (token required). - """ - proxy = ServerProxy(base_url=server_url, token=token) - - # Probe via get_status so we can distinguish "unreachable" from - # "reachable but unauthorized" (403 -> ProxyAuthError). - try: - status = proxy.get_status() - except ProxyAuthError as e: - # Auth errors are always actionable, regardless of port. - raise FPBCLIError(str(e)) - except Exception: - if port: - raise FPBCLIError( - f"Remote WebServer not reachable: {server_url}. " - "Check the URL/port and that the server is running." - ) - # No port: nothing device-related requested; stay offline. - return - - if not status.get("success", False): - if port: - raise FPBCLIError( - f"Remote WebServer not reachable: {server_url}. " - "Check the URL/port and that the server is running." - ) - return - - self._attach_proxy(proxy, port, baudrate) - - if self.verbose: - logging.info(f"Using remote WebServer proxy ({server_url})") - def _direct_connect(self, port: str, baudrate: int): """Open serial port directly (legacy / escape-hatch mode).""" lock = PortLock(port) @@ -1211,6 +1243,423 @@ def server_stop(self, port: int = DEFAULT_PORT) -> None: self.output_json(stop_cli_server(port)) +def _is_local_url(url: str) -> bool: + """True if ``url`` points at this host (loopback or local interface IP).""" + from urllib.parse import urlparse + + try: + host = urlparse(url).hostname or "" + except ValueError: + return False + if not host: + return False + if host in ("localhost",): + return True + try: + from cli.discover import _is_loopback, _local_interface_ips + + if _is_loopback(host): + return True + return host in _local_interface_ips() + except Exception: + return host == "127.0.0.1" + + +def _localhost_status_ok(port: int = DEFAULT_PORT, timeout: float = 0.3) -> bool: + """Quick TCP probe — does http://127.0.0.1:port answer /api/status?""" + import urllib.request + + try: + with urllib.request.urlopen( + f"http://127.0.0.1:{port}/api/status", timeout=timeout + ) as resp: + return 200 <= resp.status < 500 + except Exception: + return False + + +def _classify_url(url: str, *, token: Optional[str], source: str) -> ConnectionPlan: + if _is_local_url(url): + return ConnectionPlan( + mode=ConnectionMode.LOCAL_PROXY, + server_url=url, + token=token, + source=source, + ) + return ConnectionPlan( + mode=ConnectionMode.REMOTE_PROXY, + server_url=url, + token=token, + source=source, + ) + + +def _attach_serial_port( + plan: ConnectionPlan, port: Optional[str], baudrate: int +) -> ConnectionPlan: + """Return a new plan with serial_port/baudrate filled and launch flags set + according to whether the plan is local + has a serial port.""" + if not port: + return plan + is_local = plan.mode is ConnectionMode.LOCAL_PROXY + return ConnectionPlan( + mode=plan.mode, + server_url=plan.server_url, + token=plan.token, + serial_port=port, + baudrate=baudrate, + allow_launch=is_local, + allow_direct_fallback=is_local, + source=plan.source, + cache_handle=plan.cache_handle, + ) + + +def _with_cache_handle( + plan: ConnectionPlan, cache_handle: Optional[str] +) -> ConnectionPlan: + """Return a new plan tagged with the cache handle (so a connect failure + can invalidate the right entry).""" + if cache_handle is None: + return plan + return ConnectionPlan( + mode=plan.mode, + server_url=plan.server_url, + token=plan.token, + serial_port=plan.serial_port, + baudrate=plan.baudrate, + allow_launch=plan.allow_launch, + allow_direct_fallback=plan.allow_direct_fallback, + source=plan.source, + cache_handle=cache_handle, + ) + + +def _resolve_handle_to_url(value: str, *, source: str) -> str: + """Turn a user-supplied -s/FPB_SERVER value into a server URL. + + Three forms accepted: + * URL (anything containing ``://``) -> used verbatim. + * ``host:port`` handle -> cache lookup, falls back to mDNS browse. + * ``host`` only -> mDNS browse, must match exactly one server + (multiple matches -> exit 2 with hints). + + Cache contract for ``host:port``: + + * Hit: return the cached URL immediately (no mDNS), AND spawn a + daemon thread that re-runs the mDNS lookup to refresh the entry. + The user does not block on the refresh. + * Miss / expired / FPB_NO_CACHE=1: synchronous mDNS, then store. + + A cached URL that turns out to be unreachable triggers a connection + error inside the connector; ``invalidate_cached_handle`` lets the + caller wipe the bad entry and try again. + + The ``source`` string ("-s flag" / "FPB_SERVER env") is only used in + error messages. + """ + from cli.discover import classify_handle, find_by_handle + from cli import handle_cache + + kind = classify_handle(value) + if kind == "url": + return value + + if discover_sync_by_handle is None: + raise FPBCLIError( + f"Cannot resolve {source} '{value}': zeroconf not installed. " + "Pass a full URL instead." + ) + + if kind == "host_port": + cached = handle_cache.lookup(value) + if cached and cached.get("url"): + handle_cache.spawn_refresh(lambda: _refresh_handle_cache(value)) + return cached["url"] + + servers = discover_sync_by_handle(value) + matches = find_by_handle(servers, value) + if not matches: + raise FPBCLIError( + f"No FPBInject server matches {source} '{value}'. " + "Run 'fpb_cli.py discover' to list visible servers." + ) + if len(matches) > 1: + msg = [f"{source} '{value}' is ambiguous; matches multiple servers:"] + for s in matches: + msg.append(f" {s.handle} {s.url}") + msg.append("Be more specific (use 'host:port' form).") + raise AmbiguousServerError("\n".join(msg)) + + chosen = matches[0] + if kind == "host_port": + handle_cache.store(value, url=chosen.url, server_id=chosen.id) + return chosen.url + + +def _refresh_handle_cache(value: str) -> None: + """Background-thread entrypoint: re-run mDNS for ``value`` and update cache. + + Errors are swallowed because this is a best-effort refresh; the next + foreground call will fall back to a synchronous lookup. + """ + from cli.discover import find_by_handle + from cli import handle_cache + + try: + servers = discover_sync_by_handle(value, timeout=1.5) + matches = find_by_handle(servers, value) + if len(matches) == 1: + chosen = matches[0] + handle_cache.store(value, url=chosen.url, server_id=chosen.id) + elif not matches: + handle_cache.invalidate(value) + except Exception: + pass + + +def invalidate_cached_handle(value: str) -> None: + """Public hook so the connector can drop a bad cache entry.""" + from cli import handle_cache + + handle_cache.invalidate(value) + + +def resolve_connection_plan(args) -> ConnectionPlan: + """Single resolver: return the ConnectionPlan for ``args``. + + Precedence (first hit wins): + 1. command_policy in {OFFLINE, SERVER_ADMIN} -> OFFLINE plan + 2. --direct flag -> DIRECT plan + 3. -s / --server -> resolve handle, classify URL + 4. FPB_SERVER env -> resolve handle, classify URL + 5. --server-url (legacy) -> classify URL (deprecation warning) + 6. FPB_SERVER_URL env (legacy) -> classify URL + 7. Single CLI-launched local PID -> LOCAL_PROXY 127.0.0.1: + 8. http://127.0.0.1:5500 reachable -> LOCAL_PROXY default + 9. --no-discovery -> LOCAL_PROXY default fallback + 10. mDNS browse: + 0 results -> LOCAL_PROXY default + 1 result -> classify (already normalized to 127.0.0.1 if same-host) + 2+ results -> stderr list + sys.exit(2) + + Local plans gain ``allow_launch`` and ``allow_direct_fallback`` only + when ``--port`` is present (preserves the legacy "auto-launch failed + -> direct serial" path while keeping it scoped). + """ + policy = getattr(args, "command_policy", CommandPolicy.DEVICE) + if policy in (CommandPolicy.OFFLINE, CommandPolicy.SERVER_ADMIN): + return ConnectionPlan(mode=ConnectionMode.OFFLINE, source="offline-command") + + port = getattr(args, "port", None) + baudrate = getattr(args, "baudrate", 115200) + token = getattr(args, "token", None) + verbose = getattr(args, "verbose", False) + + if getattr(args, "direct", False): + if getattr(args, "server", None) or getattr(args, "server_url_legacy", None): + raise FPBCLIError( + "--direct cannot be combined with --server / --server-url; " + "direct mode bypasses the WebServer." + ) + if not port: + raise FPBCLIError("--direct requires --port for device commands.") + return ConnectionPlan( + mode=ConnectionMode.DIRECT, + serial_port=port, + baudrate=baudrate, + source="direct", + ) + + server_handle = getattr(args, "server", None) + if server_handle: + url = _resolve_handle_to_url(server_handle, source="-s flag") + from cli.discover import classify_handle + + cache_key = ( + server_handle if classify_handle(server_handle) == "host_port" else None + ) + plan = _classify_url(url, token=token, source="flag") + plan = _attach_serial_port(plan, port, baudrate) + return _with_cache_handle(plan, cache_key) + + env_handle = os.environ.get("FPB_SERVER") + if env_handle: + url = _resolve_handle_to_url(env_handle, source="FPB_SERVER env") + from cli.discover import classify_handle + + cache_key = env_handle if classify_handle(env_handle) == "host_port" else None + plan = _classify_url(url, token=token, source="env") + plan = _attach_serial_port(plan, port, baudrate) + return _with_cache_handle(plan, cache_key) + + legacy_url = getattr(args, "server_url_legacy", None) + if legacy_url: + if verbose: + print( + "warning: --server-url is deprecated; use -s / --server instead.", + file=sys.stderr, + ) + return _attach_serial_port( + _classify_url(legacy_url, token=token, source="legacy-flag"), port, baudrate + ) + + legacy_env_url = os.environ.get("FPB_SERVER_URL") + if legacy_env_url: + if verbose: + print( + "warning: FPB_SERVER_URL is deprecated; use FPB_SERVER instead.", + file=sys.stderr, + ) + return _attach_serial_port( + _classify_url(legacy_env_url, token=token, source="legacy-env"), + port, + baudrate, + ) + + pid_servers = list_cli_servers() + if len(pid_servers) == 1: + pid_port = pid_servers[0]["port"] + url = f"http://127.0.0.1:{pid_port}" + return _attach_serial_port( + _classify_url(url, token=token, source="pid"), port, baudrate + ) + + if _localhost_status_ok(DEFAULT_PORT): + return _attach_serial_port( + _classify_url(DEFAULT_SERVER_URL, token=token, source="localhost-default"), + port, + baudrate, + ) + + if getattr(args, "no_discovery", False) or discover_sync is None: + return _attach_serial_port( + _classify_url(DEFAULT_SERVER_URL, token=token, source="localhost-fallback"), + port, + baudrate, + ) + + servers = discover_sync() + if not servers: + return _attach_serial_port( + _classify_url(DEFAULT_SERVER_URL, token=token, source="localhost-fallback"), + port, + baudrate, + ) + if len(servers) == 1: + s = servers[0] + if verbose: + print( + f"Using discovered server {s.url} (version={s.version})", + file=sys.stderr, + ) + return _attach_serial_port( + _classify_url(s.url, token=token, source="mdns"), port, baudrate + ) + + lines = [ + "Multiple FPBInject servers discovered; pass -s to choose:", + ] + for s in servers: + lines.append( + f" -s {s.handle} version={s.version} auth={s.auth} device={s.device}" + ) + raise AmbiguousServerError("\n".join(lines)) + + +def resolve_server_url(args): + """Resolve the WebServer URL the CLI should talk to. + + Precedence ladder (first hit wins): + 1. ``args.server_url`` (--server-url flag) + 2. ``FPB_SERVER_URL`` env var + 3. Non-server-needing subcommand + (``command_policy in {OFFLINE, SERVER_ADMIN}``) -> None + 4. ``--no-discovery`` flag -> DEFAULT_SERVER_URL fallback + 5. mDNS browse: 0 -> fallback, 1 -> use, 2+ -> exit 2 + + Exit codes: + 0 ok, 2 ambiguous (multi-result without --server-url). + """ + if getattr(args, "server_url", None): + return args.server_url + env_url = os.environ.get("FPB_SERVER_URL") + if env_url: + return env_url + policy = getattr(args, "command_policy", CommandPolicy.DEVICE) + if policy in (CommandPolicy.OFFLINE, CommandPolicy.SERVER_ADMIN): + return None + if getattr(args, "no_discovery", False): + return DEFAULT_SERVER_URL + if discover_sync is None: + return DEFAULT_SERVER_URL + servers = discover_sync() + if not servers: + return DEFAULT_SERVER_URL + if len(servers) == 1: + s = servers[0] + if getattr(args, "verbose", False): + print( + f"Using discovered server {s.url} (version={s.version})", + file=sys.stderr, + ) + return s.url + print( + "Multiple FPBInject servers discovered; pass -s to choose:", + file=sys.stderr, + ) + for s in servers: + print( + f" -s {s.handle} version={s.version} auth={s.auth} device={s.device}", + file=sys.stderr, + ) + sys.exit(2) + + +def cmd_discover(args): + """``discover`` subcommand: human table by default, JSON with ``--json``.""" + if discover_sync is None: + if getattr(args, "json", False): + print("[]") + else: + print("(zeroconf not installed; cannot discover)", file=sys.stderr) + return 1 + + timeout = getattr(args, "timeout", 3.0) + servers = discover_sync(timeout=timeout) + + if getattr(args, "json", False): + payload = [ + { + "name": s.name, + "host": s.host, + "port": s.port, + "url": s.url, + "version": s.version, + "auth": s.auth, + "device": s.device, + "path": s.path, + "id": s.id, + "handle": s.handle, + } + for s in servers + ] + print(json.dumps(payload, indent=2)) + return 0 + + if not servers: + print("No FPBInject servers found.", file=sys.stderr) + return 0 + + rows = [("HANDLE", "URL", "AUTH", "DEVICE", "VERSION")] + for s in servers: + rows.append((s.handle, s.url, s.auth or "?", s.device or "?", s.version or "?")) + widths = [max(len(r[i]) for r in rows) for i in range(len(rows[0]))] + for r in rows: + print(" ".join(c.ljust(widths[i]) for i, c in enumerate(r))) + return 0 + + def main(): parser = argparse.ArgumentParser( description="FPBInject CLI - Lightweight interface for binary patching", @@ -1291,44 +1740,65 @@ def main(): action="store_true", help="Force direct serial connection (skip WebServer proxy detection).", ) + parser.add_argument( + "-s", + "--server", + type=str, + default=None, + help="Server to talk to. Accepts a discovery handle (e.g. " + "'bench:5501'), a hostname when unique on the LAN, or a full URL. " + "Falls back to FPB_SERVER env var, then auto-discovery.", + ) parser.add_argument( "--server-url", + dest="server_url_legacy", type=str, - default=DEFAULT_SERVER_URL, - help=f"WebServer URL for proxy mode (default: {DEFAULT_SERVER_URL}).", + default=None, + help=argparse.SUPPRESS, + ) + parser.add_argument( + "--no-discovery", + action="store_true", + help=f"Disable mDNS auto-discovery and fall back to {DEFAULT_SERVER_URL}.", ) parser.add_argument( "--token", type=str, default=os.environ.get("FPB_TOKEN"), - help="Auth token for the WebServer. Required for remote (non-localhost) " - "--server-url. Can also be set via the FPB_TOKEN environment variable.", + help="Auth token for the WebServer. Required when the server returns " + "401/403. Can also be set via the FPB_TOKEN environment variable.", ) + parser.set_defaults(command_policy=CommandPolicy.DEVICE) subparsers = parser.add_subparsers(dest="command", help="Command to execute") # analyze command analyze_parser = subparsers.add_parser("analyze", help="Analyze function") + analyze_parser.set_defaults(command_policy=CommandPolicy.OFFLINE) analyze_parser.add_argument("elf_path", help="Path to ELF file") analyze_parser.add_argument("func_name", help="Function name to analyze") # disasm command disasm_parser = subparsers.add_parser("disasm", help="Get disassembly") + disasm_parser.set_defaults(command_policy=CommandPolicy.OFFLINE) disasm_parser.add_argument("elf_path", help="Path to ELF file") disasm_parser.add_argument("func_name", help="Function name") # decompile command decomp_parser = subparsers.add_parser("decompile", help="Decompile function") + decomp_parser.set_defaults(command_policy=CommandPolicy.OFFLINE) decomp_parser.add_argument("elf_path", help="Path to ELF file") decomp_parser.add_argument("func_name", help="Function name") # signature command sig_parser = subparsers.add_parser("signature", help="Get function signature") + sig_parser.set_defaults(command_policy=CommandPolicy.OFFLINE) sig_parser.add_argument("elf_path", help="Path to ELF file") sig_parser.add_argument("func_name", help="Function name") # search command search_parser = subparsers.add_parser("search", help="Search functions") + search_parser.set_defaults(command_policy=CommandPolicy.OFFLINE) search_parser.add_argument("elf_path", help="Path to ELF file") search_parser.add_argument("pattern", help="Search pattern") @@ -1336,6 +1806,7 @@ def main(): symbols_parser = subparsers.add_parser( "get-symbols", help="Get all symbols from ELF file (via nm)" ) + symbols_parser.set_defaults(command_policy=CommandPolicy.OFFLINE) symbols_parser.add_argument("elf_path", help="Path to ELF file") symbols_parser.add_argument( "--filter", default="", help="Filter pattern (case-insensitive)" @@ -1346,6 +1817,7 @@ def main(): # compile command compile_parser = subparsers.add_parser("compile", help="Compile patch source") + compile_parser.set_defaults(command_policy=CommandPolicy.OFFLINE) compile_parser.add_argument("source_file", help="Source C file") compile_parser.add_argument( "--addr", @@ -1548,12 +2020,33 @@ def main(): subparsers.add_parser("connect", help="Connect to device (requires device)") # disconnect command - subparsers.add_parser("disconnect", help="Disconnect from device") + disconnect_parser = subparsers.add_parser( + "disconnect", help="Disconnect from device" + ) + disconnect_parser.set_defaults(command_policy=CommandPolicy.OFFLINE) + + # discover command — list FPBInject WebServers visible via mDNS + discover_parser = subparsers.add_parser( + "discover", help="List FPBInject WebServers visible via mDNS" + ) + discover_parser.set_defaults(command_policy=CommandPolicy.OFFLINE) + discover_parser.add_argument( + "--timeout", + type=float, + default=3.0, + help="Discovery timeout in seconds (default: 3.0).", + ) + discover_parser.add_argument( + "--json", + action="store_true", + help="Emit machine-readable JSON instead of the default human table.", + ) # server-stop command server_stop_parser = subparsers.add_parser( "server-stop", help="Stop a CLI-launched WebServer background process" ) + server_stop_parser.set_defaults(command_policy=CommandPolicy.SERVER_ADMIN) server_stop_parser.add_argument( "--server-port", type=int, @@ -1567,38 +2060,33 @@ def main(): parser.print_help() sys.exit(1) - # Determine ELF path - can come from global --elf or command-specific arg + if args.command == "discover": + sys.exit(cmd_discover(args)) + elf_path = args.elf if hasattr(args, "elf_path") and args.elf_path: elf_path = args.elf_path - cli = FPBCLI( - verbose=args.verbose, - port=args.port, - baudrate=args.baudrate, - elf_path=elf_path, - compile_commands=args.compile_commands, - tx_chunk_size=args.tx_chunk_size, - tx_chunk_delay=args.tx_chunk_delay, - max_retries=args.max_retries, - direct=args.direct, - server_url=args.server_url, - token=args.token, - ) - - OFFLINE_COMMANDS = { - "analyze", - "disasm", - "decompile", - "signature", - "search", - "get-symbols", - "compile", - "server-stop", - "disconnect", - } - if args.command not in OFFLINE_COMMANDS: - cli.try_attach_local_server() + try: + plan = resolve_connection_plan(args) + cli = FPBCLI( + verbose=args.verbose, + port=args.port, + baudrate=args.baudrate, + elf_path=elf_path, + compile_commands=args.compile_commands, + tx_chunk_size=args.tx_chunk_size, + tx_chunk_delay=args.tx_chunk_delay, + max_retries=args.max_retries, + direct=args.direct, + server_url=plan.server_url, + token=args.token, + plan=plan, + ) + except FPBCLIError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(e.exit_code) + args.server_url = plan.server_url try: if args.command == "analyze": @@ -1665,7 +2153,7 @@ def main(): cli.server_stop(port) except FPBCLIError as e: cli.output_error(str(e)) - sys.exit(1) + sys.exit(e.exit_code) except KeyboardInterrupt: print("\nInterrupted by user", file=sys.stderr) sys.exit(130) diff --git a/Tools/WebServer/cli/handle_cache.py b/Tools/WebServer/cli/handle_cache.py new file mode 100644 index 00000000..9bbb5ebb --- /dev/null +++ b/Tools/WebServer/cli/handle_cache.py @@ -0,0 +1,128 @@ +"""Persistent cache mapping ``-s host:port`` handles to server URLs. + +Stale-while-revalidate: the CLI returns the cached URL immediately and +spawns a daemon thread that re-runs mDNS discovery in the background to +refresh the entry for next time. The user never blocks on the refresh. + +Failure of the cached URL (connection refused, wrong server) causes the +caller to invalidate the entry and fall back to a synchronous mDNS lookup +on the same invocation, so a wrong cache costs at most one extra RTT. + +Disable entirely with ``FPB_NO_CACHE=1``. +""" + +from __future__ import annotations + +import json +import logging +import os +import tempfile +import threading +import time +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +CACHE_VERSION = 1 +DEFAULT_TTL_S = 24 * 3600 + + +def _cache_dir() -> Path: + base = os.environ.get("XDG_CACHE_HOME") or str(Path.home() / ".cache") + return Path(base) / "fpbinject" + + +def _cache_file() -> Path: + return _cache_dir() / "handles.json" + + +def _disabled() -> bool: + return os.environ.get("FPB_NO_CACHE") in ("1", "true", "True", "yes") + + +def _read_all() -> dict: + if _disabled(): + return {} + path = _cache_file() + try: + with path.open("r") as f: + blob = json.load(f) + except (FileNotFoundError, json.JSONDecodeError, OSError): + return {} + if not isinstance(blob, dict) or blob.get("version") != CACHE_VERSION: + return {} + entries = blob.get("entries") + return entries if isinstance(entries, dict) else {} + + +def _write_all(entries: dict) -> None: + if _disabled(): + return + cache_dir = _cache_dir() + try: + cache_dir.mkdir(parents=True, exist_ok=True) + except OSError as exc: + logger.debug("cache dir create failed: %s", exc) + return + payload = {"version": CACHE_VERSION, "entries": entries} + try: + fd, tmp_path = tempfile.mkstemp(prefix="handles.", suffix=".tmp", dir=cache_dir) + try: + with os.fdopen(fd, "w") as f: + json.dump(payload, f, indent=2) + os.replace(tmp_path, _cache_file()) + finally: + try: + Path(tmp_path).unlink(missing_ok=True) + except OSError: + pass + except OSError as exc: + logger.debug("cache write failed: %s", exc) + + +def lookup(handle: str, ttl_s: int = DEFAULT_TTL_S) -> Optional[dict]: + """Return the cached entry for ``handle`` if fresh, else None. + + Caller must still verify liveness — a fresh cache entry can still + point at a server that died after the last write. + """ + if _disabled() or not handle: + return None + entry = _read_all().get(handle) + if not isinstance(entry, dict): + return None + cached_at = entry.get("cached_at", 0) + if (time.time() - cached_at) > ttl_s: + return None + return entry + + +def store(handle: str, *, url: str, server_id: str = "") -> None: + """Insert/update ``handle -> url`` and persist atomically.""" + if _disabled() or not handle or not url: + return + entries = _read_all() + entries[handle] = { + "url": url, + "id": server_id, + "cached_at": time.time(), + } + _write_all(entries) + + +def invalidate(handle: str) -> None: + """Drop one entry. Called when the cached URL fails to connect.""" + if _disabled() or not handle: + return + entries = _read_all() + if handle in entries: + del entries[handle] + _write_all(entries) + + +def spawn_refresh(target) -> threading.Thread: + """Run ``target()`` on a daemon thread that won't block process exit.""" + t = threading.Thread(target=target, name="fpb-cache-refresh", daemon=True) + t.start() + return t diff --git a/Tools/WebServer/main.py b/Tools/WebServer/main.py index 6600744f..14adba1b 100755 --- a/Tools/WebServer/main.py +++ b/Tools/WebServer/main.py @@ -205,6 +205,11 @@ def parse_args(): action="store_true", help="Disable token authentication for non-localhost access", ) + parser.add_argument( + "--no-mdns", + action="store_true", + help="Disable mDNS service advertisement", + ) return parser.parse_args() @@ -400,7 +405,27 @@ def main(): if not args.no_browser: threading.Timer(1.0, webbrowser.open, args=[local_url]).start() - app.run(host=args.host, port=args.port, debug=args.debug, threaded=True) + advertiser = None + if not args.no_mdns: + try: + from services.mdns_advertiser import MdnsAdvertiser + from version import __version__ as _server_version + + advertiser = MdnsAdvertiser( + port=args.port, + version=_server_version, + auth_mode="none" if args.no_auth else "token", + ) + advertiser.register() + except Exception as e: + logger.warning("mDNS advertise failed (continuing without): %s", e) + advertiser = None + + try: + app.run(host=args.host, port=args.port, debug=args.debug, threaded=True) + finally: + if advertiser is not None: + advertiser.unregister() if __name__ == "__main__": diff --git a/Tools/WebServer/services/mdns_advertiser.py b/Tools/WebServer/services/mdns_advertiser.py new file mode 100644 index 00000000..0c692479 --- /dev/null +++ b/Tools/WebServer/services/mdns_advertiser.py @@ -0,0 +1,224 @@ +"""mDNS / DNS-SD advertiser for the FPBInject WebServer. + +Publishes the running server as ``_fpbinject._tcp.local.`` so that +``fpb_cli.py`` clients can discover it without prior knowledge of host or +port. The TXT-record contract is documented in +``Tools/WebServer/Docs/Discovery.md`` — in particular, the auth token MUST +NEVER be published in TXT records. + +Lifecycle: + advertiser = MdnsAdvertiser(port=5500, version="1.6.6", auth_mode="token") + advertiser.register() + try: + # ... run server ... + finally: + advertiser.unregister() + +The class also installs ``atexit`` and (optionally) SIGINT/SIGTERM handlers +so that an interrupted server still emits a "goodbye" packet rather than +leaving a stale entry that survives until the mDNS TTL expires. +""" + +from __future__ import annotations + +import atexit +import logging +import os +import signal +import socket +import uuid +from pathlib import Path +from typing import Optional + +from zeroconf import IPVersion, ServiceInfo, Zeroconf + +logger = logging.getLogger(__name__) + +SERVICE_TYPE = "_fpbinject._tcp.local." +TXT_SCHEMA_VERSION = "1" + +_SERVER_ID_FILE = Path(__file__).resolve().parent.parent / ".fpbinject_server_id" + + +def _load_or_create_server_id() -> str: + """Return a stable per-installation UUID, persisted next to WebServer/. + + The id survives port/hostname changes and lets the CLI keep the same + handle for the same server. Wiping ``.fpbinject_server_id`` is the only + way to mint a new identity. + """ + try: + if _SERVER_ID_FILE.exists(): + text = _SERVER_ID_FILE.read_text().strip() + if text: + return text + except OSError as exc: + logger.debug("read server-id failed: %s", exc) + new_id = f"fpb:{uuid.uuid4()}" + try: + _SERVER_ID_FILE.write_text(new_id + "\n") + except OSError as exc: + logger.warning("persist server-id failed (%s); using volatile id", exc) + return new_id + + +class MdnsAdvertiser: + """Register the WebServer as an mDNS service for LAN discovery. + + Args: + port: TCP port the server is listening on. + version: server application version (goes into TXT ``version``). + auth_mode: ``"token"`` if auth is enabled, ``"none"`` if running with + ``--no-auth``. Reflects advertised intent, not effective state; + see ``Discovery.md`` for the contract. + path: API base path; reserved for future protocol revs. + install_signal_handlers: ``True`` always installs SIGINT/SIGTERM + handlers; ``False`` never installs; ``None`` (default) installs + unless the ``PYTEST_CURRENT_TEST`` env var is set. + """ + + def __init__( + self, + *, + port: int, + version: str, + auth_mode: str, + path: str = "/api", + install_signal_handlers: Optional[bool] = None, + ) -> None: + if auth_mode not in ("token", "none"): + raise ValueError(f"auth_mode must be 'token' or 'none', got {auth_mode!r}") + self._port = port + self._version = version + self._auth_mode = auth_mode + self._path = path + self._install_signal_handlers = install_signal_handlers + + self._zc: Optional[Zeroconf] = None + self._info: Optional[ServiceInfo] = None + self._registered = False + self._prev_sigint = None + self._prev_sigterm = None + self._server_id = "" + + def _build_txt(self, device_state: str) -> dict: + return { + "txtvers": TXT_SCHEMA_VERSION, + "version": self._version, + "auth": self._auth_mode, + "device": device_state, + "path": self._path, + "id": self._server_id, + } + + def _build_info(self) -> ServiceInfo: + hostname = socket.gethostname() or "fpbinject" + instance = f"FPBInject on {hostname}:{self._port}" + self._server_id = _load_or_create_server_id() + properties = self._build_txt("none") + return ServiceInfo( + type_=SERVICE_TYPE, + name=f"{instance}.{SERVICE_TYPE}", + addresses=None, + port=self._port, + properties=properties, + server=f"{hostname}.local.", + ) + + def _should_install_signal_handlers(self) -> bool: + if self._install_signal_handlers is True: + return True + if self._install_signal_handlers is False: + return False + return os.environ.get("PYTEST_CURRENT_TEST") is None + + def _install_signal_chain(self) -> None: + def _handler_for(sig): + def _handler(signum, frame): + try: + self.unregister() + finally: + prev = ( + self._prev_sigint + if sig == signal.SIGINT + else self._prev_sigterm + ) + if callable(prev) and prev not in (signal.SIG_DFL, signal.SIG_IGN): + prev(signum, frame) + else: + raise SystemExit(128 + int(signum)) + + return _handler + + self._prev_sigint = signal.signal(signal.SIGINT, _handler_for(signal.SIGINT)) + self._prev_sigterm = signal.signal(signal.SIGTERM, _handler_for(signal.SIGTERM)) + + def register(self) -> None: + """Create the Zeroconf instance and announce the service. + + Idempotent: a second call is a no-op. + """ + if self._registered: + return + try: + self._zc = Zeroconf(ip_version=IPVersion.V4Only) + self._info = self._build_info() + self._zc.register_service(self._info) + self._registered = True + atexit.register(self.unregister) + if self._should_install_signal_handlers(): + self._install_signal_chain() + logger.info( + "mDNS advertised on %s port=%d auth=%s", + SERVICE_TYPE, + self._port, + self._auth_mode, + ) + except Exception as exc: + logger.warning("mDNS register failed: %s", exc) + self._registered = False + if self._zc is not None: + try: + self._zc.close() + except Exception: + pass + self._zc = None + self._info = None + + def update_device_state(self, state: str) -> None: + """Update the ``device`` TXT field at runtime. + + v1 contract: this method is shipped + tested, but not currently called + from ``main.py`` — see Discovery.md "v1 limitations". + """ + if not self._registered or self._zc is None or self._info is None: + return + if state not in ("none", "connected"): + raise ValueError(f"state must be 'none' or 'connected', got {state!r}") + new_props = self._build_txt(state) + try: + self._info._set_properties(new_props) + self._zc.update_service(self._info) + except Exception as exc: + logger.warning("mDNS update_service failed: %s", exc) + + def unregister(self) -> None: + """Send mDNS goodbye and close the Zeroconf instance. + + Idempotent: a second call is a no-op. + """ + if not self._registered: + return + self._registered = False + try: + if self._zc is not None and self._info is not None: + self._zc.unregister_service(self._info) + except Exception as exc: + logger.warning("mDNS unregister_service failed: %s", exc) + try: + if self._zc is not None: + self._zc.close() + except Exception: + pass + self._zc = None + self._info = None diff --git a/Tools/WebServer/tests/fixtures/__init__.py b/Tools/WebServer/tests/fixtures/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/Tools/WebServer/tests/fixtures/mock_http.py b/Tools/WebServer/tests/fixtures/mock_http.py new file mode 100644 index 00000000..fc5166e9 --- /dev/null +++ b/Tools/WebServer/tests/fixtures/mock_http.py @@ -0,0 +1,59 @@ +"""Reusable mock HTTP handler for tests. + +Extracted from tests/test_server_proxy.py so other tests (e.g. mDNS discovery) +can spin up a fake WebServer without duplicating the handler. +""" + +import http.server +import json + + +class MockHTTPHandler(http.server.BaseHTTPRequestHandler): + """Simple mock HTTP handler for testing. + + Class-level ``responses`` dict maps path -> JSON body for GET/POST. + Class-level ``sse_responses`` dict maps path -> list of event dicts for + Server-Sent Events POST endpoints. + """ + + responses: dict = {} + sse_responses: dict = {} + + def do_GET(self): + path = self.path.split("?")[0] + if path in self.responses: + body = json.dumps(self.responses[path]).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(body) + else: + self.send_response(404) + self.end_headers() + + def do_POST(self): + content_length = int(self.headers.get("Content-Length", 0)) + if content_length: + self.rfile.read(content_length) + path = self.path.split("?")[0] + if path in self.sse_responses: + sse_data = self.sse_responses[path] + body = "" + for event in sse_data: + body += f"data: {json.dumps(event)}\n\n" + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.end_headers() + self.wfile.write(body.encode()) + elif path in self.responses: + resp_body = json.dumps(self.responses[path]).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(resp_body) + else: + self.send_response(404) + self.end_headers() + + def log_message(self, format, *args): + pass diff --git a/Tools/WebServer/tests/test_cli_coexistence.py b/Tools/WebServer/tests/test_cli_coexistence.py index b0f1a6f5..f9a5a92d 100644 --- a/Tools/WebServer/tests/test_cli_coexistence.py +++ b/Tools/WebServer/tests/test_cli_coexistence.py @@ -371,11 +371,15 @@ class TestMainNewArgs(unittest.TestCase): @patch("sys.argv", ["fpb_cli.py", "--direct", "--port", "/dev/ttyACM0", "info"]) def test_direct_arg_passed(self, mock_cli_cls): """--direct argument is passed to FPBCLI.""" + from cli.connection_plan import ConnectionMode + mock_cli = MagicMock() mock_cli_cls.return_value = mock_cli main() - call_kwargs = mock_cli_cls.call_args - self.assertTrue(call_kwargs.kwargs.get("direct", False)) + plan = mock_cli_cls.call_args.kwargs.get("plan") + self.assertIsNotNone(plan) + self.assertEqual(plan.mode, ConnectionMode.DIRECT) + self.assertEqual(plan.serial_port, "/dev/ttyACM0") @patch("cli.fpb_cli.FPBCLI") @patch( @@ -394,8 +398,9 @@ def test_server_url_arg_passed(self, mock_cli_cls): mock_cli = MagicMock() mock_cli_cls.return_value = mock_cli main() - call_kwargs = mock_cli_cls.call_args - self.assertEqual(call_kwargs.kwargs.get("server_url"), "http://myhost:9000") + plan = mock_cli_cls.call_args.kwargs.get("plan") + self.assertIsNotNone(plan) + self.assertEqual(plan.server_url, "http://myhost:9000") class _CursorMockHandler(http.server.BaseHTTPRequestHandler): @@ -549,26 +554,38 @@ def test_serial_read_incremental_workflow(self): class TestFPBCLIIsRemoteUrl(unittest.TestCase): - """Test the _is_remote_url host classifier.""" + """Test the URL locality classifier.""" def test_localhost_ip_is_local(self): - self.assertFalse(FPBCLI._is_remote_url("http://127.0.0.1:5500")) + from cli.fpb_cli import _is_local_url + + self.assertTrue(_is_local_url("http://127.0.0.1:5500")) def test_localhost_name_is_local(self): - self.assertFalse(FPBCLI._is_remote_url("http://localhost:5500")) + from cli.fpb_cli import _is_local_url + + self.assertTrue(_is_local_url("http://localhost:5500")) def test_ipv6_loopback_is_local(self): - self.assertFalse(FPBCLI._is_remote_url("http://[::1]:5500")) + from cli.fpb_cli import _is_local_url + + self.assertTrue(_is_local_url("http://[::1]:5500")) def test_lan_ip_is_remote(self): - self.assertTrue(FPBCLI._is_remote_url("http://192.168.1.20:5500")) + from cli.fpb_cli import _is_local_url + + self.assertFalse(_is_local_url("http://192.168.1.20:5500")) def test_hostname_is_remote(self): - self.assertTrue(FPBCLI._is_remote_url("http://buildbox:9000")) + from cli.fpb_cli import _is_local_url + + self.assertFalse(_is_local_url("http://buildbox:9000")) def test_malformed_url_is_local(self): - # Unparseable -> treated as local (safe default, no remote restrictions) - self.assertFalse(FPBCLI._is_remote_url("not a url")) + # Unparseable -> treated as local (safe default, no remote restrictions). + from cli.fpb_cli import _is_local_url + + self.assertFalse(_is_local_url("not a url")) class TestFPBCLIRemoteMode(unittest.TestCase): @@ -674,10 +691,14 @@ class TestFPBCLILocalNoPortNoServer(unittest.TestCase): @patch("cli.fpb_cli.ServerProxy") def test_offline_no_proxy_no_launch(self, mock_proxy_cls): - """No port locally -> offline, never construct a proxy or auto-launch.""" + """No port locally -> offline, no proxy retained, no auto-launch attempted.""" + proxy = MagicMock() + proxy.is_server_running.return_value = False + proxy.launch_server.return_value = False + mock_proxy_cls.return_value = proxy cli = FPBCLI(server_url="http://127.0.0.1:19999") self.assertIsNone(cli._proxy) - mock_proxy_cls.assert_not_called() + proxy.launch_server.assert_not_called() cli.cleanup() @@ -703,7 +724,9 @@ def test_token_arg_passed(self, mock_cli_cls): mock_cli = MagicMock() mock_cli_cls.return_value = mock_cli main() - self.assertEqual(mock_cli_cls.call_args.kwargs.get("token"), "abc123") + plan = mock_cli_cls.call_args.kwargs.get("plan") + self.assertIsNotNone(plan) + self.assertEqual(plan.token, "abc123") @patch("cli.fpb_cli.FPBCLI") @patch.dict(os.environ, {"FPB_TOKEN": "env-token"}, clear=False) @@ -717,7 +740,9 @@ def test_token_from_env(self, mock_cli_cls): mock_cli = MagicMock() mock_cli_cls.return_value = mock_cli main() - self.assertEqual(mock_cli_cls.call_args.kwargs.get("token"), "env-token") + plan = mock_cli_cls.call_args.kwargs.get("plan") + self.assertIsNotNone(plan) + self.assertEqual(plan.token, "env-token") if __name__ == "__main__": diff --git a/Tools/WebServer/tests/test_connection_plan.py b/Tools/WebServer/tests/test_connection_plan.py new file mode 100644 index 00000000..4b6d6594 --- /dev/null +++ b/Tools/WebServer/tests/test_connection_plan.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +"""Tests for the connection-plan data model. + +These tests pin the contract: ConnectionPlan is frozen, the enums name +exactly the modes/policies the rest of the code is allowed to use, and +the default field values match what the resolver assumes when it omits +optional arguments. +""" + +import sys +import unittest +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from cli.connection_plan import ( # noqa: E402 + CommandPolicy, + ConnectionMode, + ConnectionPlan, +) + + +class TestCommandPolicy(unittest.TestCase): + def test_exactly_three_policies(self): + # Adding a 4th policy without updating the resolver/dispatcher is a + # bug; pin the count to force review. + self.assertEqual( + {p.name for p in CommandPolicy}, + {"OFFLINE", "DEVICE", "SERVER_ADMIN"}, + ) + + +class TestConnectionMode(unittest.TestCase): + def test_exactly_four_modes(self): + self.assertEqual( + {m.name for m in ConnectionMode}, + {"OFFLINE", "LOCAL_PROXY", "REMOTE_PROXY", "DIRECT"}, + ) + + +class TestConnectionPlan(unittest.TestCase): + def test_frozen(self): + plan = ConnectionPlan(mode=ConnectionMode.OFFLINE) + with self.assertRaises(Exception): + plan.mode = ConnectionMode.DIRECT # type: ignore[misc] + + def test_defaults(self): + plan = ConnectionPlan(mode=ConnectionMode.OFFLINE) + self.assertIsNone(plan.server_url) + self.assertIsNone(plan.token) + self.assertIsNone(plan.serial_port) + self.assertEqual(plan.baudrate, 115200) + self.assertFalse(plan.allow_launch) + self.assertFalse(plan.allow_direct_fallback) + self.assertEqual(plan.source, "") + + def test_full_construction(self): + plan = ConnectionPlan( + mode=ConnectionMode.LOCAL_PROXY, + server_url="http://127.0.0.1:5500", + token="abc", + serial_port="/dev/ttyACM0", + baudrate=921600, + allow_launch=True, + allow_direct_fallback=True, + source="localhost-default", + ) + self.assertEqual(plan.mode, ConnectionMode.LOCAL_PROXY) + self.assertEqual(plan.server_url, "http://127.0.0.1:5500") + self.assertEqual(plan.token, "abc") + self.assertEqual(plan.serial_port, "/dev/ttyACM0") + self.assertEqual(plan.baudrate, 921600) + self.assertTrue(plan.allow_launch) + self.assertTrue(plan.allow_direct_fallback) + self.assertEqual(plan.source, "localhost-default") + + +if __name__ == "__main__": + unittest.main() diff --git a/Tools/WebServer/tests/test_discover_localhost_pref.py b/Tools/WebServer/tests/test_discover_localhost_pref.py new file mode 100644 index 00000000..df62b81b --- /dev/null +++ b/Tools/WebServer/tests/test_discover_localhost_pref.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +"""Tests for the localhost-preference rule in cli.discover. + +When mDNS returns multiple addresses for the same service, the resolver +must prefer loopback > local-interface > other, AND normalize same-host +results to 127.0.0.1 so the CLI never tries to talk to its own host via +a LAN IP (which would trigger remote auth checks unnecessarily). +""" + +import asyncio +import sys +import unittest +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from cli.discover import ( # noqa: E402 + _address_sort_key, + _is_loopback, + _is_same_host, + discover, +) + + +class TestAddressSortKey(unittest.TestCase): + """Loopback < local interface < other.""" + + def test_loopback_beats_local_interface(self): + local_ips = frozenset({"127.0.0.1", "10.0.0.5"}) + self.assertLess( + _address_sort_key("127.0.0.1", local_ips), + _address_sort_key("10.0.0.5", local_ips), + ) + + def test_local_interface_beats_remote(self): + local_ips = frozenset({"127.0.0.1", "10.0.0.5"}) + self.assertLess( + _address_sort_key("10.0.0.5", local_ips), + _address_sort_key("192.168.1.20", local_ips), + ) + + def test_remote_addresses_sorted_lexicographically(self): + local_ips = frozenset({"127.0.0.1"}) + addrs = ["192.168.1.30", "192.168.1.20"] + sorted_addrs = sorted(addrs, key=lambda a: _address_sort_key(a, local_ips)) + self.assertEqual(sorted_addrs[0], "192.168.1.20") + + +class TestSameHostDetection(unittest.TestCase): + def test_loopback_is_same_host(self): + self.assertTrue(_is_loopback("127.0.0.1")) + self.assertTrue(_is_same_host("127.0.0.1", frozenset())) + + def test_local_interface_ip_is_same_host(self): + self.assertTrue(_is_same_host("10.0.0.5", frozenset({"10.0.0.5"}))) + + def test_remote_ip_is_not_same_host(self): + self.assertFalse(_is_same_host("192.168.1.99", frozenset({"10.0.0.5"}))) + + +def _fake_async_browser_factory(service_names): + from zeroconf import ServiceStateChange + + class _StubBrowser: + def __init__(self, zc, types, handlers=None): + handler_list = handlers or [] + for h in handler_list: + for n in service_names: + h( + zeroconf=zc, + service_type="_fpbinject._tcp.local.", + name=n, + state_change=ServiceStateChange.Added, + ) + + async def async_cancel(self): + return None + + return _StubBrowser + + +def _fake_async_service_info_factory(spec_by_name): + class _StubInfo: + def __init__(self, type_, name): + self.type_ = type_ + self.name = name + self._spec = spec_by_name.get(name) + + async def async_request(self, zc, timeout_ms): + return self._spec is not None + + @property + def port(self): + return self._spec["port"] + + @property + def properties(self): + return self._spec["properties"] + + def parsed_scoped_addresses(self): + return list(self._spec.get("addresses", [])) + + return _StubInfo + + +_MIN_PROPS = { + b"txtvers": b"1", + b"version": b"1.6.6", + b"auth": b"token", + b"device": b"none", + b"path": b"/api", +} + + +class TestDiscoverLocalhostNormalization(unittest.TestCase): + """End-to-end: a service announcing 127.0.0.1 + a LAN IP comes back as 127.0.0.1.""" + + def test_loopback_wins_when_advertised_alongside_lan_ip(self): + name = "FPB._fpbinject._tcp.local." + Browser = _fake_async_browser_factory([name]) + Info = _fake_async_service_info_factory( + { + name: { + "port": 5500, + "addresses": ["10.221.101.4", "127.0.0.1"], + "properties": dict(_MIN_PROPS), + } + } + ) + with patch("cli.discover.AsyncServiceBrowser", Browser), patch( + "cli.discover.AsyncServiceInfo", Info + ): + servers = asyncio.run(discover(timeout=0.05)) + self.assertEqual(len(servers), 1) + self.assertEqual(servers[0].host, "127.0.0.1") + self.assertEqual(servers[0].url, "http://127.0.0.1:5500") + + def test_lan_ip_normalized_to_localhost_when_matches_local_interface(self): + name = "FPB._fpbinject._tcp.local." + Browser = _fake_async_browser_factory([name]) + # Only LAN IP advertised; but it matches a local interface IP. + Info = _fake_async_service_info_factory( + { + name: { + "port": 5501, + "addresses": ["10.0.0.5"], + "properties": dict(_MIN_PROPS), + } + } + ) + with patch("cli.discover.AsyncServiceBrowser", Browser), patch( + "cli.discover.AsyncServiceInfo", Info + ), patch( + "cli.discover._local_interface_ips", + return_value=frozenset({"127.0.0.1", "10.0.0.5"}), + ): + servers = asyncio.run(discover(timeout=0.05)) + self.assertEqual(len(servers), 1) + self.assertEqual(servers[0].host, "127.0.0.1") + + def test_truly_remote_ip_kept_as_is(self): + name = "FPB._fpbinject._tcp.local." + Browser = _fake_async_browser_factory([name]) + Info = _fake_async_service_info_factory( + { + name: { + "port": 5500, + "addresses": ["192.168.99.99"], + "properties": dict(_MIN_PROPS), + } + } + ) + with patch("cli.discover.AsyncServiceBrowser", Browser), patch( + "cli.discover.AsyncServiceInfo", Info + ), patch( + "cli.discover._local_interface_ips", + return_value=frozenset({"127.0.0.1", "10.0.0.5"}), + ): + servers = asyncio.run(discover(timeout=0.05)) + self.assertEqual(len(servers), 1) + self.assertEqual(servers[0].host, "192.168.99.99") + + +if __name__ == "__main__": + unittest.main() diff --git a/Tools/WebServer/tests/test_discover_speed.py b/Tools/WebServer/tests/test_discover_speed.py new file mode 100644 index 00000000..fc63ebd6 --- /dev/null +++ b/Tools/WebServer/tests/test_discover_speed.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 +"""Speed contract tests for discover() early-return. + +A `host:port` handle lookup must short-circuit as soon as the matching +service is resolved -- it MUST NOT wait the full discovery timeout. +""" + +import asyncio +import sys +import time +import unittest +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from cli.discover import ( # noqa: E402 + FPBServer, + discover, + discover_sync_by_handle, +) + + +def _server(host, port, handle=None): + return FPBServer( + name=f"FPBInject on {handle or host + ':' + str(port)}._fpbinject._tcp.local.", + host=host, + port=port, + version="1.6.6", + auth="none", + device="none", + path="/api", + url=f"http://{host}:{port}", + id="fpb:test", + handle=handle or f"{host}:{port}", + ) + + +def _slow_browser_factory(service_names, fire_after=0.05): + """Stub AsyncServiceBrowser that fires Added events ``fire_after`` + seconds after construction, simulating an mDNS reply.""" + from zeroconf import ServiceStateChange + + class _StubBrowser: + def __init__(self, zc, types, handlers=None): + self.zc = zc + self._handlers = handlers or [] + self._task = asyncio.get_event_loop().create_task(self._fire()) + + async def _fire(self): + await asyncio.sleep(fire_after) + for h in self._handlers: + for n in service_names: + h( + zeroconf=self.zc, + service_type="_fpbinject._tcp.local.", + name=n, + state_change=ServiceStateChange.Added, + ) + + async def async_cancel(self): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + return _StubBrowser + + +def _fast_info_factory(props_by_name): + """AsyncServiceInfo stub with instant async_request.""" + + class _StubInfo: + def __init__(self, type_, name): + self.name = name + self._spec = props_by_name.get(name) + + async def async_request(self, zc, timeout_ms): + return self._spec is not None + + @property + def port(self): + return self._spec["port"] + + @property + def properties(self): + return self._spec["properties"] + + def parsed_scoped_addresses(self): + return list(self._spec.get("addresses", [])) + + return _StubInfo + + +_PROPS = { + b"txtvers": b"1", + b"version": b"1.6.6", + b"auth": b"none", + b"device": b"none", + b"path": b"/api", + b"id": b"fpb:test", +} + + +class TestEarlyReturnByHandle(unittest.TestCase): + """``-s host:port`` exits as soon as the match is resolved.""" + + def test_host_port_handle_short_circuits_well_under_timeout(self): + name = "FPBInject on bench:5500._fpbinject._tcp.local." + Browser = _slow_browser_factory([name], fire_after=0.05) + Info = _fast_info_factory( + { + name: { + "port": 5500, + "addresses": ["127.0.0.1"], + "properties": dict(_PROPS), + } + } + ) + with patch("cli.discover.AsyncServiceBrowser", Browser), patch( + "cli.discover.AsyncServiceInfo", Info + ): + t0 = time.monotonic() + servers = discover_sync_by_handle("bench:5500", timeout=3.0) + elapsed = time.monotonic() - t0 + self.assertEqual(len(servers), 1) + self.assertLess( + elapsed, + 1.0, + f"discover_sync_by_handle should short-circuit; took {elapsed:.2f}s", + ) + + def test_host_port_no_match_waits_full_timeout(self): + # When the requested handle isn't on the LAN, the loop has no signal + # to short-circuit on, so it must wait the full budget. Other servers + # that happen to be visible are still returned (the caller filters + # via find_by_handle). + name = "FPBInject on other:5500._fpbinject._tcp.local." + Browser = _slow_browser_factory([name], fire_after=0.05) + Info = _fast_info_factory( + { + name: { + "port": 5500, + "addresses": ["127.0.0.1"], + "properties": dict(_PROPS), + } + } + ) + with patch("cli.discover.AsyncServiceBrowser", Browser), patch( + "cli.discover.AsyncServiceInfo", Info + ): + t0 = time.monotonic() + servers = discover_sync_by_handle("nope:9999", timeout=0.3) + elapsed = time.monotonic() - t0 + self.assertGreaterEqual(elapsed, 0.25) + self.assertTrue(all(s.handle != "nope:9999" for s in servers)) + + +class TestEarlyMatchPredicate(unittest.TestCase): + """``discover()`` early_match predicate stops the loop on first hit.""" + + def test_early_match_stops_loop(self): + names = [f"FPBInject on srv{i}:5500._fpbinject._tcp.local." for i in range(3)] + Browser = _slow_browser_factory(names, fire_after=0.05) + Info = _fast_info_factory( + { + n: { + "port": 5500 + i, + "addresses": [f"127.0.0.{i + 1}"], + "properties": dict(_PROPS), + } + for i, n in enumerate(names) + } + ) + + def is_first(s): + return s.handle == "srv0:5500" + + with patch("cli.discover.AsyncServiceBrowser", Browser), patch( + "cli.discover.AsyncServiceInfo", Info + ): + t0 = time.monotonic() + servers = asyncio.run(discover(timeout=3.0, early_match=is_first)) + elapsed = time.monotonic() - t0 + self.assertGreaterEqual(len(servers), 1) + self.assertLess( + elapsed, 1.0, f"early_match should short-circuit; took {elapsed:.2f}s" + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/Tools/WebServer/tests/test_fpb_cli.py b/Tools/WebServer/tests/test_fpb_cli.py index 7e863f69..e0cc85aa 100644 --- a/Tools/WebServer/tests/test_fpb_cli.py +++ b/Tools/WebServer/tests/test_fpb_cli.py @@ -155,56 +155,6 @@ def test_init_no_port_no_probe(self): finally: cli.cleanup() - @patch("cli.fpb_cli.ServerProxy") - def test_try_attach_local_server_with_device(self, mock_proxy_cls): - """try_attach_local_server attaches when local server has a connected device.""" - proxy = MagicMock() - proxy.probe_status.return_value = {"success": True, "connected": True} - proxy.is_device_connected.return_value = True - mock_proxy_cls.return_value = proxy - - cli = FPBCLI() - try: - self.assertTrue(cli.try_attach_local_server()) - self.assertIs(cli._proxy, proxy) - self.assertTrue(cli._device_state.connected) - proxy.connect.assert_not_called() - # Single status probe, no extra is_server_running()/is_device_connected() round-trip. - proxy.probe_status.assert_called_once() - proxy.is_server_running.assert_not_called() - finally: - cli.cleanup() - - @patch("cli.fpb_cli.ServerProxy") - def test_try_attach_local_server_without_device(self, mock_proxy_cls): - """Server running but no device → try_attach_local_server stays offline.""" - proxy = MagicMock() - proxy.probe_status.return_value = {"success": True, "connected": False} - mock_proxy_cls.return_value = proxy - - cli = FPBCLI() - try: - self.assertFalse(cli.try_attach_local_server()) - self.assertIsNone(cli._proxy) - self.assertFalse(cli._device_state.connected) - proxy.connect.assert_not_called() - finally: - cli.cleanup() - - @patch("cli.fpb_cli.ServerProxy") - def test_try_attach_local_server_unreachable(self, mock_proxy_cls): - """No server running → try_attach_local_server returns False, no exception.""" - proxy = MagicMock() - proxy.probe_status.return_value = {} - mock_proxy_cls.return_value = proxy - - cli = FPBCLI() - try: - self.assertFalse(cli.try_attach_local_server()) - self.assertIsNone(cli._proxy) - finally: - cli.cleanup() - def test_output_json(self): """Test JSON output formatting""" data = {"success": True, "message": "Test"} @@ -2033,9 +1983,10 @@ def test_main_with_port_and_baudrate(self): mock_cli = MagicMock() mock_cli_class.return_value = mock_cli main() - call_kwargs = mock_cli_class.call_args.kwargs - self.assertEqual(call_kwargs.get("port"), "/dev/ttyACM0") - self.assertEqual(call_kwargs.get("baudrate"), 9600) + plan = mock_cli_class.call_args.kwargs.get("plan") + self.assertIsNotNone(plan) + self.assertEqual(plan.serial_port, "/dev/ttyACM0") + self.assertEqual(plan.baudrate, 9600) def test_main_file_list_command(self): """Test main with file-list command""" diff --git a/Tools/WebServer/tests/test_handle_cache.py b/Tools/WebServer/tests/test_handle_cache.py new file mode 100644 index 00000000..dae69202 --- /dev/null +++ b/Tools/WebServer/tests/test_handle_cache.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +"""Tests for cli/handle_cache.py. + +Pins: TTL boundary, atomic write, FPB_NO_CACHE bypass, daemon thread, +and the wire-up that makes -s host:port hit the cache before mDNS. +""" + +import json +import os +import sys +import tempfile +import time +import unittest +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from cli import handle_cache # noqa: E402 + + +class TempCache: + """Context-manager wrapping XDG_CACHE_HOME so each test has its own.""" + + def __enter__(self): + self._tmp = tempfile.mkdtemp(prefix="fpb-test-cache-") + self._old_xdg = os.environ.get("XDG_CACHE_HOME") + os.environ["XDG_CACHE_HOME"] = self._tmp + os.environ.pop("FPB_NO_CACHE", None) + return Path(self._tmp) + + def __exit__(self, *exc): + if self._old_xdg is None: + os.environ.pop("XDG_CACHE_HOME", None) + else: + os.environ["XDG_CACHE_HOME"] = self._old_xdg + import shutil + + shutil.rmtree(self._tmp, ignore_errors=True) + + +class TestStoreAndLookup(unittest.TestCase): + def test_store_then_lookup_returns_url(self): + with TempCache(): + handle_cache.store( + "bench:5500", url="http://1.2.3.4:5500", server_id="fpb:abc" + ) + entry = handle_cache.lookup("bench:5500") + self.assertIsNotNone(entry) + self.assertEqual(entry["url"], "http://1.2.3.4:5500") + self.assertEqual(entry["id"], "fpb:abc") + + def test_lookup_unknown_handle(self): + with TempCache(): + self.assertIsNone(handle_cache.lookup("nope:9999")) + + def test_lookup_stale_entry_returns_none(self): + with TempCache(): + handle_cache.store("old:5500", url="http://x:5500") + entries = handle_cache._read_all() + entries["old:5500"]["cached_at"] = time.time() - 25 * 3600 + handle_cache._write_all(entries) + self.assertIsNone(handle_cache.lookup("old:5500", ttl_s=24 * 3600)) + + def test_invalidate_drops_entry(self): + with TempCache(): + handle_cache.store("bench:5500", url="http://x:5500") + handle_cache.invalidate("bench:5500") + self.assertIsNone(handle_cache.lookup("bench:5500")) + + +class TestNoCacheEnv(unittest.TestCase): + def test_disabled_lookup_returns_none(self): + with TempCache(), patch.dict(os.environ, {"FPB_NO_CACHE": "1"}): + handle_cache.store("bench:5500", url="http://1.2.3.4:5500") + self.assertIsNone(handle_cache.lookup("bench:5500")) + + def test_disabled_store_writes_nothing(self): + with TempCache() as cache_dir, patch.dict(os.environ, {"FPB_NO_CACHE": "1"}): + handle_cache.store("bench:5500", url="http://1.2.3.4:5500") + self.assertFalse((cache_dir / "fpbinject" / "handles.json").exists()) + + +class TestAtomicWrite(unittest.TestCase): + def test_no_partial_file_on_replace(self): + with TempCache() as cache_dir: + handle_cache.store("a:1", url="http://a:1") + handle_cache.store("b:2", url="http://b:2") + cache_file = cache_dir / "fpbinject" / "handles.json" + with cache_file.open() as f: + blob = json.load(f) + self.assertEqual(blob["version"], 1) + self.assertEqual(blob["entries"]["a:1"]["url"], "http://a:1") + self.assertEqual(blob["entries"]["b:2"]["url"], "http://b:2") + + def test_corrupt_file_treated_as_empty(self): + with TempCache() as cache_dir: + (cache_dir / "fpbinject").mkdir() + (cache_dir / "fpbinject" / "handles.json").write_text("{not json") + self.assertIsNone(handle_cache.lookup("any:1")) + handle_cache.store("a:1", url="http://a:1") + self.assertEqual(handle_cache.lookup("a:1")["url"], "http://a:1") + + +class TestSpawnRefreshIsDaemon(unittest.TestCase): + def test_thread_is_daemon(self): + called = [] + t = handle_cache.spawn_refresh(lambda: called.append(1)) + t.join(timeout=1.0) + self.assertTrue(t.daemon) + self.assertEqual(called, [1]) + + +class TestResolverCacheIntegration(unittest.TestCase): + """End-to-end: -s host:port hits cache, refreshes async.""" + + def test_cache_hit_skips_mdns_and_spawns_refresh(self): + with TempCache(): + handle_cache.store( + "bench:5500", url="http://127.0.0.1:5500", server_id="fpb:abc" + ) + + from cli.fpb_cli import _resolve_handle_to_url + + with patch("cli.fpb_cli.discover_sync_by_handle") as mock_disc, patch( + "cli.handle_cache.spawn_refresh" + ) as mock_spawn: + url = _resolve_handle_to_url("bench:5500", source="-s flag") + + self.assertEqual(url, "http://127.0.0.1:5500") + mock_disc.assert_not_called() + mock_spawn.assert_called_once() + + def test_cache_miss_falls_back_to_mdns_and_stores(self): + with TempCache(): + from cli.discover import FPBServer + from cli.fpb_cli import _resolve_handle_to_url + + fake_server = FPBServer( + name="FPBInject on bench:5501._fpbinject._tcp.local.", + host="127.0.0.1", + port=5501, + version="1.6.6", + auth="none", + device="none", + path="/api", + url="http://127.0.0.1:5501", + id="fpb:zzz", + handle="bench:5501", + ) + with patch( + "cli.fpb_cli.discover_sync_by_handle", return_value=[fake_server] + ): + url = _resolve_handle_to_url("bench:5501", source="-s flag") + + self.assertEqual(url, "http://127.0.0.1:5501") + entry = handle_cache.lookup("bench:5501") + self.assertIsNotNone(entry) + self.assertEqual(entry["url"], "http://127.0.0.1:5501") + self.assertEqual(entry["id"], "fpb:zzz") + + +if __name__ == "__main__": + unittest.main() diff --git a/Tools/WebServer/tests/test_handle_resolution.py b/Tools/WebServer/tests/test_handle_resolution.py new file mode 100644 index 00000000..4945359c --- /dev/null +++ b/Tools/WebServer/tests/test_handle_resolution.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 +"""Tests for the -s/--server / FPB_SERVER handle resolution. + +Pins the user-facing ergonomics: one flag accepts URL, host:port, or host. +""" + +import argparse +import os +import sys +import unittest +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from cli.connection_plan import CommandPolicy, ConnectionMode # noqa: E402 +from cli.discover import ( # noqa: E402 + FPBServer, + classify_handle, + find_by_handle, +) + + +def _ns(**kwargs): + defaults = dict( + command="info", + command_policy=CommandPolicy.DEVICE, + server=None, + server_url_legacy=None, + no_discovery=False, + token=None, + port=None, + baudrate=115200, + direct=False, + verbose=False, + ) + defaults.update(kwargs) + return argparse.Namespace(**defaults) + + +def _server(host, port, name=None, handle=None): + return FPBServer( + name=name or f"FPBInject on {host}:{port}._fpbinject._tcp.local.", + host=host, + port=port, + version="1.6.6", + auth="token", + device="none", + path="/api", + url=f"http://{host}:{port}", + id=f"fpb:{host}-{port}", + handle=handle or f"{host}:{port}", + ) + + +class TestClassifyHandle(unittest.TestCase): + """Three distinct shapes of -s value.""" + + def test_url_with_scheme(self): + self.assertEqual(classify_handle("http://1.2.3.4:5500"), "url") + self.assertEqual(classify_handle("https://x.y.z:5500/api"), "url") + + def test_host_port(self): + self.assertEqual(classify_handle("bench:5501"), "host_port") + self.assertEqual(classify_handle("192.168.1.20:5500"), "host_port") + + def test_host_only(self): + self.assertEqual(classify_handle("bench"), "host") + self.assertEqual(classify_handle("bench.local"), "host") + + def test_host_port_with_non_numeric_port_falls_back_to_host(self): + # ipv6-ish or weird strings: not host:port. + self.assertEqual(classify_handle("foo:bar"), "host") + + +class TestFindByHandle(unittest.TestCase): + """Filter discovered list by user-supplied handle.""" + + def setUp(self): + self.servers = [ + _server("bench", 5500), + _server("bench", 5501), + _server("lab", 5500), + ] + + def test_host_port_exact(self): + matches = find_by_handle(self.servers, "bench:5501") + self.assertEqual(len(matches), 1) + self.assertEqual(matches[0].port, 5501) + + def test_host_only_matches_all_with_that_host(self): + matches = find_by_handle(self.servers, "bench") + self.assertEqual(len(matches), 2) + + def test_host_only_unique_returns_one(self): + matches = find_by_handle(self.servers, "lab") + self.assertEqual(len(matches), 1) + + def test_no_match(self): + self.assertEqual(find_by_handle(self.servers, "nope"), []) + + +def _import_resolver(): + from cli.fpb_cli import resolve_connection_plan # noqa: E402 + + return resolve_connection_plan + + +class TestServerFlagResolves(unittest.TestCase): + """End-to-end: -s handle goes through resolver and produces correct URL.""" + + @patch.dict(os.environ, {}, clear=True) + def test_url_in_s_flag_used_verbatim(self): + resolve = _import_resolver() + with patch("cli.fpb_cli.discover_sync_by_handle") as mock_disc: + plan = resolve(_ns(server="http://1.2.3.4:5500")) + self.assertEqual(plan.server_url, "http://1.2.3.4:5500") + self.assertEqual(plan.mode, ConnectionMode.REMOTE_PROXY) + mock_disc.assert_not_called() + + @patch.dict(os.environ, {}, clear=True) + def test_host_port_resolved_via_mdns(self): + resolve = _import_resolver() + with patch( + "cli.fpb_cli.discover_sync_by_handle", + return_value=[_server("127.0.0.1", 5501, handle="bench:5501")], + ): + plan = resolve(_ns(server="bench:5501")) + self.assertEqual(plan.server_url, "http://127.0.0.1:5501") + self.assertEqual(plan.mode, ConnectionMode.LOCAL_PROXY) + + @patch.dict(os.environ, {}, clear=True) + def test_host_only_unique_resolves(self): + resolve = _import_resolver() + with patch( + "cli.fpb_cli.discover_sync_by_handle", + return_value=[_server("127.0.0.1", 5500, handle="bench:5500")], + ): + plan = resolve(_ns(server="bench")) + self.assertEqual(plan.server_url, "http://127.0.0.1:5500") + + @patch.dict(os.environ, {}, clear=True) + def test_host_only_ambiguous_raises(self): + from cli.fpb_cli import AmbiguousServerError + + resolve = _import_resolver() + with patch( + "cli.fpb_cli.discover_sync_by_handle", + return_value=[ + _server("127.0.0.1", 5500, handle="bench:5500"), + _server("127.0.0.1", 5501, handle="bench:5501"), + ], + ): + with self.assertRaises(AmbiguousServerError) as cm: + resolve(_ns(server="bench")) + self.assertIn("ambiguous", str(cm.exception)) + self.assertEqual(cm.exception.exit_code, 2) + + @patch.dict(os.environ, {}, clear=True) + def test_handle_no_match_raises(self): + from cli.fpb_cli import FPBCLIError + + resolve = _import_resolver() + with patch("cli.fpb_cli.discover_sync_by_handle", return_value=[]): + with self.assertRaises(FPBCLIError) as cm: + resolve(_ns(server="nope:5500")) + self.assertIn("No FPBInject server matches", str(cm.exception)) + + +class TestFpbServerEnv(unittest.TestCase): + """FPB_SERVER env var goes through the same handle resolution.""" + + @patch.dict(os.environ, {"FPB_SERVER": "http://1.2.3.4:5500"}, clear=False) + def test_env_url(self): + resolve = _import_resolver() + plan = resolve(_ns(server=None)) + self.assertEqual(plan.server_url, "http://1.2.3.4:5500") + + @patch.dict(os.environ, {"FPB_SERVER": "bench:5501"}, clear=False) + def test_env_handle_resolved(self): + resolve = _import_resolver() + with patch( + "cli.fpb_cli.discover_sync_by_handle", + return_value=[_server("127.0.0.1", 5501, handle="bench:5501")], + ): + plan = resolve(_ns(server=None)) + self.assertEqual(plan.server_url, "http://127.0.0.1:5501") + + +class TestServerFlagWinsOverEnv(unittest.TestCase): + @patch.dict(os.environ, {"FPB_SERVER": "http://env.host:7777"}, clear=False) + def test_flag_overrides_env(self): + resolve = _import_resolver() + plan = resolve(_ns(server="http://flag.host:8888")) + self.assertEqual(plan.server_url, "http://flag.host:8888") + + +class TestLegacyServerUrlDeprecation(unittest.TestCase): + """--server-url and FPB_SERVER_URL still work but warn under -v.""" + + @patch.dict(os.environ, {}, clear=True) + def test_legacy_flag_still_works(self): + resolve = _import_resolver() + plan = resolve(_ns(server_url_legacy="http://legacy.host:5500")) + self.assertEqual(plan.server_url, "http://legacy.host:5500") + + @patch.dict(os.environ, {}, clear=True) + def test_legacy_flag_warns_under_verbose(self): + import io + from contextlib import redirect_stderr + + resolve = _import_resolver() + err = io.StringIO() + with redirect_stderr(err): + resolve(_ns(server_url_legacy="http://legacy.host:5500", verbose=True)) + self.assertIn("deprecated", err.getvalue()) + + @patch.dict(os.environ, {"FPB_SERVER_URL": "http://legacy.env:5500"}, clear=False) + def test_legacy_env_still_works(self): + resolve = _import_resolver() + plan = resolve(_ns(server=None, server_url_legacy=None)) + self.assertEqual(plan.server_url, "http://legacy.env:5500") + + +class TestNewFlagWinsOverLegacy(unittest.TestCase): + @patch.dict(os.environ, {}, clear=True) + def test_s_flag_wins_over_server_url(self): + resolve = _import_resolver() + plan = resolve( + _ns( + server="http://new.host:5500", + server_url_legacy="http://old.host:5500", + ) + ) + self.assertEqual(plan.server_url, "http://new.host:5500") + + +if __name__ == "__main__": + unittest.main() diff --git a/Tools/WebServer/tests/test_mdns_advertiser.py b/Tools/WebServer/tests/test_mdns_advertiser.py new file mode 100644 index 00000000..8ca0965c --- /dev/null +++ b/Tools/WebServer/tests/test_mdns_advertiser.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +"""Test cases for services.mdns_advertiser.MdnsAdvertiser. + +The advertiser owns its own Zeroconf instance; tests patch it at the import +boundary so no real multicast socket is opened. +""" + +import os +import signal +import sys +import unittest +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +def _import_advertiser(): + """Lazy import so RED phase fails with ImportError, not module-load error.""" + from services.mdns_advertiser import MdnsAdvertiser # noqa: E402 + + return MdnsAdvertiser + + +def _make_advertiser(**overrides): + """Construct an advertiser with sane defaults for tests.""" + Cls = _import_advertiser() + kwargs = { + "port": 5500, + "version": "1.6.6", + "auth_mode": "token", + "path": "/api", + "install_signal_handlers": False, + } + kwargs.update(overrides) + return Cls(**kwargs) + + +def _txt_dict_from_register_call(mock_zc): + """Pull the TXT-properties dict out of register_service(info) call.""" + register_call = mock_zc.register_service.call_args + assert register_call is not None, "register_service was not called" + info = register_call.args[0] + return info.properties + + +class TestMdnsAdvertiserRegister(unittest.TestCase): + """Service registration with correct TXT records.""" + + @patch("services.mdns_advertiser.Zeroconf") + def test_register_uses_fpbinject_service_type(self, MockZeroconf): + zc = MockZeroconf.return_value + adv = _make_advertiser() + adv.register() + info = zc.register_service.call_args.args[0] + self.assertEqual(info.type, "_fpbinject._tcp.local.") + self.assertTrue(info.name.endswith("._fpbinject._tcp.local.")) + + @patch("services.mdns_advertiser.Zeroconf") + def test_register_publishes_required_txt_keys(self, MockZeroconf): + adv = _make_advertiser() + adv.register() + txt = _txt_dict_from_register_call(MockZeroconf.return_value) + keys = {k.decode() if isinstance(k, bytes) else k for k in txt.keys()} + self.assertIn("txtvers", keys) + self.assertIn("version", keys) + self.assertIn("auth", keys) + self.assertIn("device", keys) + self.assertIn("path", keys) + + @patch("services.mdns_advertiser.Zeroconf") + def test_register_advertises_port(self, MockZeroconf): + adv = _make_advertiser(port=8080) + adv.register() + info = MockZeroconf.return_value.register_service.call_args.args[0] + self.assertEqual(info.port, 8080) + + +class TestMdnsAdvertiserAuthIntent(unittest.TestCase): + """auth TXT reflects advertised intent, not effective state.""" + + @patch("services.mdns_advertiser.Zeroconf") + def test_auth_token_when_auth_mode_token(self, MockZeroconf): + adv = _make_advertiser(auth_mode="token") + adv.register() + txt = _txt_dict_from_register_call(MockZeroconf.return_value) + auth_val = txt[b"auth"] if b"auth" in txt else txt["auth"] + self.assertEqual( + auth_val.decode() if isinstance(auth_val, bytes) else auth_val, + "token", + ) + + @patch("services.mdns_advertiser.Zeroconf") + def test_auth_none_when_auth_mode_none(self, MockZeroconf): + adv = _make_advertiser(auth_mode="none") + adv.register() + txt = _txt_dict_from_register_call(MockZeroconf.return_value) + auth_val = txt[b"auth"] if b"auth" in txt else txt["auth"] + self.assertEqual( + auth_val.decode() if isinstance(auth_val, bytes) else auth_val, + "none", + ) + + +class TestMdnsAdvertiserNoTokenLeak(unittest.TestCase): + """The actual auth token must never appear in TXT records.""" + + @patch("services.mdns_advertiser.Zeroconf") + def test_txt_never_contains_token_key(self, MockZeroconf): + adv = _make_advertiser() + adv.register() + txt = _txt_dict_from_register_call(MockZeroconf.return_value) + keys_lower = { + (k.decode() if isinstance(k, bytes) else k).lower() for k in txt.keys() + } + self.assertNotIn("token", keys_lower) + self.assertNotIn("secret", keys_lower) + self.assertNotIn("apikey", keys_lower) + self.assertNotIn("api_key", keys_lower) + + +class TestMdnsAdvertiserDeviceTxtV1(unittest.TestCase): + """v1 contract: device TXT is published once at startup as 'none'.""" + + @patch("services.mdns_advertiser.Zeroconf") + def test_device_is_none_at_register(self, MockZeroconf): + adv = _make_advertiser() + adv.register() + txt = _txt_dict_from_register_call(MockZeroconf.return_value) + device_val = txt[b"device"] if b"device" in txt else txt["device"] + self.assertEqual( + device_val.decode() if isinstance(device_val, bytes) else device_val, + "none", + ) + + +class TestMdnsAdvertiserUpdateDeviceState(unittest.TestCase): + """update_device_state() shipped + tested for forward compat (unused in v1).""" + + @patch("services.mdns_advertiser.Zeroconf") + def test_update_device_state_calls_update_service(self, MockZeroconf): + zc = MockZeroconf.return_value + adv = _make_advertiser() + adv.register() + adv.update_device_state("connected") + self.assertTrue(zc.update_service.called) + + @patch("services.mdns_advertiser.Zeroconf") + def test_update_device_state_preserves_id_and_all_txt_keys(self, MockZeroconf): + adv = _make_advertiser() + adv.register() + register_keys = set(adv._build_txt("none").keys()) + + connected_props = adv._build_txt("connected") + self.assertEqual(set(connected_props.keys()), register_keys) + self.assertEqual(connected_props["id"], adv._server_id) + self.assertEqual(connected_props["device"], "connected") + self.assertNotEqual(connected_props["id"], "") + + +class TestMdnsAdvertiserIdempotentUnregister(unittest.TestCase): + """unregister() is safe to call repeatedly.""" + + @patch("services.mdns_advertiser.Zeroconf") + def test_double_unregister_calls_unregister_service_once(self, MockZeroconf): + zc = MockZeroconf.return_value + adv = _make_advertiser() + adv.register() + adv.unregister() + adv.unregister() + self.assertEqual(zc.unregister_service.call_count, 1) + + @patch("services.mdns_advertiser.Zeroconf") + def test_unregister_without_register_is_noop(self, MockZeroconf): + zc = MockZeroconf.return_value + adv = _make_advertiser() + adv.unregister() + self.assertEqual(zc.unregister_service.call_count, 0) + + +class TestMdnsAdvertiserSignalHandlers(unittest.TestCase): + """Signal-handler install policy. + + Default (install_signal_handlers=None): install unless PYTEST_CURRENT_TEST is set. + Explicit True: install always. + Explicit False: never install. + """ + + @patch("services.mdns_advertiser.Zeroconf") + @patch("services.mdns_advertiser.signal.signal") + def test_signal_handlers_skipped_when_explicitly_disabled( + self, mock_signal, MockZeroconf + ): + adv = _make_advertiser(install_signal_handlers=False) + adv.register() + self.assertFalse(mock_signal.called) + + @patch("services.mdns_advertiser.Zeroconf") + @patch("services.mdns_advertiser.signal.signal") + def test_signal_handlers_installed_when_explicitly_enabled( + self, mock_signal, MockZeroconf + ): + adv = _make_advertiser(install_signal_handlers=True) + adv.register() + installed = {call.args[0] for call in mock_signal.call_args_list} + self.assertIn(signal.SIGINT, installed) + self.assertIn(signal.SIGTERM, installed) + + @patch.dict(os.environ, {"PYTEST_CURRENT_TEST": "test_x"}, clear=False) + @patch("services.mdns_advertiser.Zeroconf") + @patch("services.mdns_advertiser.signal.signal") + def test_signal_handlers_default_skipped_under_pytest( + self, mock_signal, MockZeroconf + ): + Cls = _import_advertiser() + adv = Cls(port=5500, version="1", auth_mode="none") + adv.register() + self.assertFalse(mock_signal.called) + + +class TestMdnsAdvertiserAtexitHook(unittest.TestCase): + """atexit handler is registered so graceful exits unregister.""" + + @patch("services.mdns_advertiser.Zeroconf") + @patch("services.mdns_advertiser.atexit.register") + def test_register_installs_atexit_unregister(self, mock_atexit, MockZeroconf): + adv = _make_advertiser() + adv.register() + self.assertTrue(mock_atexit.called) + cb = mock_atexit.call_args.args[0] + self.assertEqual(cb, adv.unregister) + + +if __name__ == "__main__": + unittest.main() diff --git a/Tools/WebServer/tests/test_mdns_discovery.py b/Tools/WebServer/tests/test_mdns_discovery.py new file mode 100644 index 00000000..be7d0ac6 --- /dev/null +++ b/Tools/WebServer/tests/test_mdns_discovery.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 +"""Test cases for mDNS discovery (cli.discover) and CLI integration. + +Covers scenarios: + S2 single-result attach + S3 zero-result fallback to localhost + S4 multi-result list+exit-2 + S5 explicit --server-url / FPB_SERVER_URL bypass + S6 --no-discovery bypass + S7 offline subcommand zero-delay (no discovery call) + S8 'discover' subcommand emits JSON list + Plus the underlying discover() async semantics. +""" + +import argparse +import asyncio +import io +import json +import os +import sys +import time +import unittest +from contextlib import redirect_stdout, redirect_stderr +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +def _import_discover(): + from cli.discover import discover, discover_sync, FPBServer # noqa: E402 + + return discover, discover_sync, FPBServer + + +def _import_cli_helpers(): + """Lazy import of CLI helpers added by T8 (resolve_server_url, cmd_discover). + + These live in cli.fpb_cli; importing eagerly would break tests during the + RED phase before T8 wires them in. + """ + from cli.fpb_cli import resolve_server_url, cmd_discover # noqa: E402 + + return resolve_server_url, cmd_discover + + +def _fake_async_browser_factory(service_names_to_emit): + """Build a stub AsyncServiceBrowser class that fires Added events for + each service name as soon as it is constructed. + + The real AsyncServiceBrowser signature: AsyncServiceBrowser(zc, types, handlers=...). + """ + from zeroconf import ServiceStateChange + + class _StubBrowser: + def __init__(self, zc, types, handlers=None): + self.zc = zc + self.types = types + handler_list = handlers or [] + for h in handler_list: + for name in service_names_to_emit: + h( + zeroconf=zc, + service_type="_fpbinject._tcp.local.", + name=name, + state_change=ServiceStateChange.Added, + ) + + async def async_cancel(self): + return None + + return _StubBrowser + + +def _fake_async_service_info_factory(props_by_name): + """Build a stub AsyncServiceInfo class that returns canned TXT/port/ip. + + props_by_name maps service-name -> dict(port=..., addresses=[bytes], properties={k: bytes}) + """ + + class _StubInfo: + def __init__(self, type_, name): + self.type_ = type_ + self.name = name + self._spec = props_by_name.get(name) + + async def async_request(self, zc, timeout_ms): + return self._spec is not None + + @property + def port(self): + return self._spec["port"] + + @property + def properties(self): + return self._spec["properties"] + + def parsed_scoped_addresses(self): + return self._spec.get("addresses", ["127.0.0.1"]) + + return _StubInfo + + +# ---- discover() async semantics (S2/S3/S4 module-level) ---------------------- + + +class TestDiscoverAsync(unittest.TestCase): + """discover() async semantics — uses patched zeroconf classes.""" + + def test_discover_zero_results(self): + discover, _, _ = _import_discover() + Browser = _fake_async_browser_factory([]) + Info = _fake_async_service_info_factory({}) + with patch("cli.discover.AsyncServiceBrowser", Browser), patch( + "cli.discover.AsyncServiceInfo", Info + ): + servers = asyncio.run(discover(timeout=0.05)) + self.assertEqual(servers, []) + + def test_discover_one_result(self): + _, _, FPBServer = _import_discover() + discover, _, _ = _import_discover() + name = "FPB._fpbinject._tcp.local." + Browser = _fake_async_browser_factory([name]) + Info = _fake_async_service_info_factory( + { + name: { + "port": 5500, + "addresses": ["192.168.1.20"], + "properties": { + b"txtvers": b"1", + b"version": b"1.6.6", + b"auth": b"token", + b"device": b"none", + b"path": b"/api", + }, + } + } + ) + with patch("cli.discover.AsyncServiceBrowser", Browser), patch( + "cli.discover.AsyncServiceInfo", Info + ): + servers = asyncio.run(discover(timeout=0.1)) + self.assertEqual(len(servers), 1) + s = servers[0] + self.assertEqual(s.host, "192.168.1.20") + self.assertEqual(s.port, 5500) + self.assertEqual(s.version, "1.6.6") + self.assertEqual(s.auth, "token") + self.assertEqual(s.device, "none") + self.assertEqual(s.path, "/api") + self.assertEqual(s.url, "http://192.168.1.20:5500") + + def test_discover_many_results(self): + discover, _, _ = _import_discover() + names = [f"FPB-{i}._fpbinject._tcp.local." for i in range(3)] + Browser = _fake_async_browser_factory(names) + Info = _fake_async_service_info_factory( + { + n: { + "port": 5500 + i, + "addresses": [f"10.0.0.{10 + i}"], + "properties": { + b"txtvers": b"1", + b"version": b"1.6.6", + b"auth": b"token", + b"device": b"none", + b"path": b"/api", + }, + } + for i, n in enumerate(names) + } + ) + with patch("cli.discover.AsyncServiceBrowser", Browser), patch( + "cli.discover.AsyncServiceInfo", Info + ): + servers = asyncio.run(discover(timeout=0.1)) + self.assertEqual(len(servers), 3) + + +# ---- resolve_server_url() precedence ladder (S2/S3/S4/S5/S6/S7) -------------- + + +def _ns(**kwargs): + """Build an argparse.Namespace with the fields resolve_server_url cares about. + + Defaults match what build_parser will populate; tests override per scenario. + """ + from cli.connection_plan import CommandPolicy + + if "requires_server" in kwargs: + # Back-compat for callers using the legacy boolean. + kwargs.setdefault( + "command_policy", + ( + CommandPolicy.DEVICE + if kwargs.pop("requires_server") + else CommandPolicy.OFFLINE + ), + ) + defaults = dict( + command=None, + server_url=None, + no_discovery=False, + command_policy=CommandPolicy.DEVICE, + token=None, + ) + defaults.update(kwargs) + return argparse.Namespace(**defaults) + + +def _fake_server(host="192.168.1.20", port=5500, version="1.6.6"): + _, _, FPBServer = _import_discover() + return FPBServer( + name=f"fake-{host}", + host=host, + port=port, + version=version, + auth="token", + device="none", + path="/api", + url=f"http://{host}:{port}", + id=f"fpb:fake-{host}-{port}", + handle=f"{host}:{port}", + ) + + +class TestResolveServerUrl(unittest.TestCase): + """Precedence ladder for resolve_server_url().""" + + def test_explicit_server_url_bypasses_discovery(self): + # S5: explicit flag wins, never browses + resolve_server_url, _ = _import_cli_helpers() + with patch("cli.fpb_cli.discover_sync") as mock_disc: + url = resolve_server_url( + _ns(server_url="http://1.2.3.4:9999", requires_server=True) + ) + self.assertEqual(url, "http://1.2.3.4:9999") + mock_disc.assert_not_called() + + @patch.dict(os.environ, {"FPB_SERVER_URL": "http://env.host:7777"}, clear=False) + def test_env_server_url_used_when_no_flag(self): + # S5 via env + resolve_server_url, _ = _import_cli_helpers() + with patch("cli.fpb_cli.discover_sync") as mock_disc: + url = resolve_server_url(_ns(server_url=None, requires_server=True)) + self.assertEqual(url, "http://env.host:7777") + mock_disc.assert_not_called() + + @patch.dict(os.environ, {}, clear=True) + def test_offline_subcommand_skips_discovery(self): + # S7: requires_server=False short-circuits, no 1s delay + resolve_server_url, _ = _import_cli_helpers() + with patch("cli.fpb_cli.discover_sync") as mock_disc: + t0 = time.monotonic() + url = resolve_server_url(_ns(server_url=None, requires_server=False)) + elapsed = time.monotonic() - t0 + self.assertIsNone(url) + mock_disc.assert_not_called() + self.assertLess(elapsed, 0.1, f"S7 delay budget exceeded: {elapsed:.3f}s") + + @patch.dict(os.environ, {}, clear=True) + def test_no_discovery_flag_falls_back_to_localhost(self): + # S6 + resolve_server_url, _ = _import_cli_helpers() + with patch("cli.fpb_cli.discover_sync") as mock_disc: + url = resolve_server_url( + _ns(server_url=None, no_discovery=True, requires_server=True) + ) + from cli.server_proxy import DEFAULT_SERVER_URL + + self.assertEqual(url, DEFAULT_SERVER_URL) + mock_disc.assert_not_called() + + @patch.dict(os.environ, {}, clear=True) + def test_zero_results_falls_back_to_localhost(self): + # S3 + resolve_server_url, _ = _import_cli_helpers() + with patch("cli.fpb_cli.discover_sync", return_value=[]): + url = resolve_server_url(_ns(server_url=None, requires_server=True)) + from cli.server_proxy import DEFAULT_SERVER_URL + + self.assertEqual(url, DEFAULT_SERVER_URL) + + @patch.dict(os.environ, {}, clear=True) + def test_one_result_returned_silently(self): + # S2 + resolve_server_url, _ = _import_cli_helpers() + with patch("cli.fpb_cli.discover_sync", return_value=[_fake_server(port=5500)]): + url = resolve_server_url(_ns(server_url=None, requires_server=True)) + self.assertEqual(url, "http://192.168.1.20:5500") + + @patch.dict(os.environ, {}, clear=True) + def test_two_results_lists_and_exits_2(self): + # S4 + resolve_server_url, _ = _import_cli_helpers() + servers = [ + _fake_server(host="10.0.0.10", port=5500), + _fake_server(host="10.0.0.11", port=5500), + ] + with patch("cli.fpb_cli.discover_sync", return_value=servers): + err = io.StringIO() + with redirect_stderr(err): + with self.assertRaises(SystemExit) as cm: + resolve_server_url(_ns(server_url=None, requires_server=True)) + self.assertEqual(cm.exception.code, 2) + self.assertIn("10.0.0.10:5500", err.getvalue()) + self.assertIn("10.0.0.11:5500", err.getvalue()) + self.assertIn("-s ", err.getvalue()) + + +# ---- cmd_discover() JSON output (S8) ---------------------------------------- + + +class TestCmdDiscoverJson(unittest.TestCase): + """`fpb_cli.py discover` emits a valid JSON list.""" + + def test_discover_subcommand_outputs_json_list(self): + _, cmd_discover = _import_cli_helpers() + servers = [ + _fake_server(host="10.0.0.10", port=5500), + _fake_server(host="10.0.0.11", port=5501), + ] + with patch("cli.fpb_cli.discover_sync", return_value=servers): + out = io.StringIO() + with redirect_stdout(out): + rc = cmd_discover(_ns(timeout=0.1, json=True)) + self.assertEqual(rc, 0) + parsed = json.loads(out.getvalue()) + self.assertIsInstance(parsed, list) + self.assertEqual(len(parsed), 2) + self.assertIn("url", parsed[0]) + self.assertIn("version", parsed[0]) + self.assertIn("auth", parsed[0]) + self.assertIn("handle", parsed[0]) + self.assertIn("id", parsed[0]) + + def test_discover_subcommand_empty_emits_empty_list(self): + _, cmd_discover = _import_cli_helpers() + with patch("cli.fpb_cli.discover_sync", return_value=[]): + out = io.StringIO() + with redirect_stdout(out): + rc = cmd_discover(_ns(timeout=0.1, json=True)) + self.assertEqual(rc, 0) + parsed = json.loads(out.getvalue()) + self.assertEqual(parsed, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/Tools/WebServer/tests/test_resolve_connection_plan.py b/Tools/WebServer/tests/test_resolve_connection_plan.py new file mode 100644 index 00000000..ea421b5d --- /dev/null +++ b/Tools/WebServer/tests/test_resolve_connection_plan.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 +"""Tests for resolve_connection_plan() — the new single resolver. + +Pins the precedence ladder (Decision Matrix in Discovery.md). Each test +fixes ONE row of the matrix so a regression points at the exact rule. +""" + +import argparse +import io +import os +import sys +import unittest +from contextlib import redirect_stderr +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from cli.connection_plan import ( # noqa: E402 + CommandPolicy, + ConnectionMode, +) + + +def _import_resolver(): + from cli.fpb_cli import resolve_connection_plan # noqa: E402 + + return resolve_connection_plan + + +def _ns(**kwargs): + """argparse.Namespace with defaults that mirror build_parser(). + + For convenience, ``server_url=URL`` is mapped to ``server=URL`` (URL is + one of the three forms ``-s`` accepts) so existing test bodies remain + readable. ``server_url_legacy`` exercises the deprecated --server-url path. + """ + if "server_url" in kwargs: + kwargs.setdefault("server", kwargs.pop("server_url")) + defaults = dict( + command="info", + command_policy=CommandPolicy.DEVICE, + server=None, + server_url_legacy=None, + no_discovery=False, + token=None, + port=None, + baudrate=115200, + direct=False, + verbose=False, + ) + defaults.update(kwargs) + return argparse.Namespace(**defaults) + + +def _fake_server(host="192.168.1.20", port=5500, version="1.6.6", auth="token"): + from cli.discover import FPBServer + + return FPBServer( + name=f"fake-{host}-{port}", + host=host, + port=port, + version=version, + auth=auth, + device="none", + path="/api", + url=f"http://{host}:{port}", + id=f"fpb:fake-{host}-{port}", + handle=f"{host}:{port}", + ) + + +class TestOfflineAndAdmin(unittest.TestCase): + """Policies that never produce a connection.""" + + def test_offline_command_returns_offline_plan(self): + resolve = _import_resolver() + plan = resolve(_ns(command="analyze", command_policy=CommandPolicy.OFFLINE)) + self.assertEqual(plan.mode, ConnectionMode.OFFLINE) + self.assertIsNone(plan.server_url) + + def test_server_admin_command_returns_offline_plan(self): + resolve = _import_resolver() + plan = resolve( + _ns(command="server-stop", command_policy=CommandPolicy.SERVER_ADMIN) + ) + self.assertEqual(plan.mode, ConnectionMode.OFFLINE) + self.assertIsNone(plan.server_url) + + +class TestDirectMode(unittest.TestCase): + """--direct produces a DIRECT plan with the user's --port.""" + + def test_direct_with_port(self): + resolve = _import_resolver() + plan = resolve(_ns(direct=True, port="/dev/ttyACM0", baudrate=921600)) + self.assertEqual(plan.mode, ConnectionMode.DIRECT) + self.assertEqual(plan.serial_port, "/dev/ttyACM0") + self.assertEqual(plan.baudrate, 921600) + self.assertIsNone(plan.server_url) + + +class TestInvalidFlagCombos(unittest.TestCase): + """Rejected at resolve time so the user sees one clear error, not a silent no-op.""" + + def test_direct_with_server_url_rejected(self): + from cli.fpb_cli import FPBCLIError + + resolve = _import_resolver() + with self.assertRaises(FPBCLIError) as cm: + resolve( + _ns(direct=True, server_url="http://1.2.3.4:5500", port="/dev/ttyACM0") + ) + self.assertIn("--direct", str(cm.exception)) + self.assertIn("--server-url", str(cm.exception)) + + def test_direct_without_port_for_device_command_rejected(self): + from cli.fpb_cli import FPBCLIError + + resolve = _import_resolver() + with self.assertRaises(FPBCLIError) as cm: + resolve(_ns(direct=True, port=None, command_policy=CommandPolicy.DEVICE)) + self.assertIn("--direct", str(cm.exception)) + self.assertIn("--port", str(cm.exception)) + + +class TestExplicitServerUrl(unittest.TestCase): + """--server-url and FPB_SERVER_URL bypass discovery.""" + + def test_explicit_localhost_url_local_mode(self): + resolve = _import_resolver() + plan = resolve(_ns(server_url="http://127.0.0.1:5500")) + self.assertEqual(plan.mode, ConnectionMode.LOCAL_PROXY) + self.assertEqual(plan.server_url, "http://127.0.0.1:5500") + self.assertEqual(plan.source, "flag") + + def test_explicit_remote_url_remote_mode(self): + resolve = _import_resolver() + plan = resolve(_ns(server_url="http://192.168.1.20:5500", token="t")) + self.assertEqual(plan.mode, ConnectionMode.REMOTE_PROXY) + self.assertEqual(plan.server_url, "http://192.168.1.20:5500") + self.assertEqual(plan.token, "t") + + @patch.dict(os.environ, {"FPB_SERVER_URL": "http://192.168.1.30:5501"}, clear=False) + def test_env_url_used_when_no_flag(self): + resolve = _import_resolver() + plan = resolve(_ns(server=None)) + self.assertEqual(plan.mode, ConnectionMode.REMOTE_PROXY) + self.assertEqual(plan.server_url, "http://192.168.1.30:5501") + self.assertEqual(plan.source, "legacy-env") + + @patch.dict(os.environ, {}, clear=True) + def test_explicit_url_does_not_call_discovery(self): + resolve = _import_resolver() + with patch("cli.fpb_cli.discover_sync") as mock_disc: + resolve(_ns(server_url="http://1.2.3.4:5500")) + mock_disc.assert_not_called() + + +class TestImplicitLocalShortCircuit(unittest.TestCase): + """No URL/env: prefer single PID server, then localhost probe, then mDNS.""" + + @patch.dict(os.environ, {}, clear=True) + def test_single_pid_server_short_circuits_discovery(self): + resolve = _import_resolver() + with patch( + "cli.fpb_cli.list_cli_servers", + return_value=[{"port": 5599, "pid": 1234}], + ), patch("cli.fpb_cli.discover_sync") as mock_disc: + plan = resolve(_ns()) + self.assertEqual(plan.mode, ConnectionMode.LOCAL_PROXY) + self.assertEqual(plan.server_url, "http://127.0.0.1:5599") + self.assertEqual(plan.source, "pid") + mock_disc.assert_not_called() + + @patch.dict(os.environ, {}, clear=True) + def test_localhost_default_probe_short_circuits_discovery(self): + resolve = _import_resolver() + with patch("cli.fpb_cli.list_cli_servers", return_value=[]), patch( + "cli.fpb_cli._localhost_status_ok", return_value=True + ), patch("cli.fpb_cli.discover_sync") as mock_disc: + plan = resolve(_ns()) + self.assertEqual(plan.mode, ConnectionMode.LOCAL_PROXY) + self.assertEqual(plan.server_url, "http://127.0.0.1:5500") + self.assertEqual(plan.source, "localhost-default") + mock_disc.assert_not_called() + + +class TestNoDiscoveryFlag(unittest.TestCase): + @patch.dict(os.environ, {}, clear=True) + def test_no_discovery_falls_back_to_default_localhost(self): + resolve = _import_resolver() + with patch("cli.fpb_cli.list_cli_servers", return_value=[]), patch( + "cli.fpb_cli._localhost_status_ok", return_value=False + ), patch("cli.fpb_cli.discover_sync") as mock_disc: + plan = resolve(_ns(no_discovery=True)) + self.assertEqual(plan.mode, ConnectionMode.LOCAL_PROXY) + self.assertEqual(plan.server_url, "http://127.0.0.1:5500") + mock_disc.assert_not_called() + + +class TestMdnsBranches(unittest.TestCase): + """When implicit localhost/PID short-circuits don't fire, run mDNS.""" + + @patch.dict(os.environ, {}, clear=True) + def test_zero_results_falls_back_to_localhost_default(self): + resolve = _import_resolver() + with patch("cli.fpb_cli.list_cli_servers", return_value=[]), patch( + "cli.fpb_cli._localhost_status_ok", return_value=False + ), patch("cli.fpb_cli.discover_sync", return_value=[]): + plan = resolve(_ns()) + self.assertEqual(plan.mode, ConnectionMode.LOCAL_PROXY) + self.assertEqual(plan.server_url, "http://127.0.0.1:5500") + + @patch.dict(os.environ, {}, clear=True) + def test_one_local_result_returns_local_proxy(self): + resolve = _import_resolver() + with patch("cli.fpb_cli.list_cli_servers", return_value=[]), patch( + "cli.fpb_cli._localhost_status_ok", return_value=False + ), patch( + "cli.fpb_cli.discover_sync", + return_value=[_fake_server(host="127.0.0.1", port=5500)], + ): + plan = resolve(_ns()) + self.assertEqual(plan.mode, ConnectionMode.LOCAL_PROXY) + self.assertEqual(plan.server_url, "http://127.0.0.1:5500") + self.assertEqual(plan.source, "mdns") + + @patch.dict(os.environ, {}, clear=True) + def test_one_remote_result_returns_remote_proxy(self): + resolve = _import_resolver() + with patch("cli.fpb_cli.list_cli_servers", return_value=[]), patch( + "cli.fpb_cli._localhost_status_ok", return_value=False + ), patch( + "cli.fpb_cli.discover_sync", + return_value=[_fake_server(host="192.168.1.20", port=5500)], + ): + plan = resolve(_ns()) + self.assertEqual(plan.mode, ConnectionMode.REMOTE_PROXY) + self.assertEqual(plan.server_url, "http://192.168.1.20:5500") + + @patch.dict(os.environ, {}, clear=True) + def test_two_results_raises_ambiguous_server_error(self): + from cli.fpb_cli import AmbiguousServerError + + resolve = _import_resolver() + servers = [ + _fake_server(host="10.0.0.10", port=5500), + _fake_server(host="10.0.0.11", port=5500), + ] + with patch("cli.fpb_cli.list_cli_servers", return_value=[]), patch( + "cli.fpb_cli._localhost_status_ok", return_value=False + ), patch("cli.fpb_cli.discover_sync", return_value=servers): + with self.assertRaises(AmbiguousServerError) as cm: + resolve(_ns()) + self.assertEqual(cm.exception.exit_code, 2) + self.assertIn("10.0.0.10:5500", str(cm.exception)) + self.assertIn("10.0.0.11:5500", str(cm.exception)) + self.assertIn("-s ", str(cm.exception)) + + +class TestPlanProperties(unittest.TestCase): + """Plans expose allow_launch / allow_direct_fallback consistently.""" + + def test_local_plan_with_serial_port_allows_launch_and_direct_fallback(self): + resolve = _import_resolver() + plan = resolve(_ns(server_url="http://127.0.0.1:5500", port="/dev/ttyACM0")) + self.assertEqual(plan.mode, ConnectionMode.LOCAL_PROXY) + self.assertEqual(plan.serial_port, "/dev/ttyACM0") + self.assertTrue(plan.allow_launch) + self.assertTrue(plan.allow_direct_fallback) + + def test_remote_plan_never_allows_launch_or_direct(self): + resolve = _import_resolver() + plan = resolve(_ns(server_url="http://192.168.1.20:5500", port="/dev/ttyACM0")) + self.assertEqual(plan.mode, ConnectionMode.REMOTE_PROXY) + self.assertFalse(plan.allow_launch) + self.assertFalse(plan.allow_direct_fallback) + + +if __name__ == "__main__": + unittest.main() diff --git a/Tools/WebServer/tests/test_server_proxy.py b/Tools/WebServer/tests/test_server_proxy.py index e75e058e..4fb1d305 100644 --- a/Tools/WebServer/tests/test_server_proxy.py +++ b/Tools/WebServer/tests/test_server_proxy.py @@ -14,8 +14,10 @@ from urllib.error import URLError sys.path.insert(0, str(Path(__file__).parent.parent)) +sys.path.insert(0, str(Path(__file__).parent)) from cli.server_proxy import ServerProxy, ProxyAuthError, DEFAULT_SERVER_URL +from fixtures.mock_http import MockHTTPHandler as _MockHTTPHandler class TestServerProxyInit(unittest.TestCase): @@ -58,53 +60,6 @@ def test_with_token_existing_query(self): self.assertEqual(url, "http://localhost:5500/api/status?foo=bar&token=tok123") -class _MockHTTPHandler(http.server.BaseHTTPRequestHandler): - """Simple mock HTTP handler for testing.""" - - # Class-level response configuration - responses = {} - sse_responses = {} - - def do_GET(self): - path = self.path.split("?")[0] # Strip query params - if path in self.responses: - body = json.dumps(self.responses[path]).encode() - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(body) - else: - self.send_response(404) - - def do_POST(self): - content_length = int(self.headers.get("Content-Length", 0)) - if content_length: - self.rfile.read(content_length) - path = self.path.split("?")[0] - # SSE responses for streaming endpoints - if path in self.sse_responses: - sse_data = self.sse_responses[path] - body = "" - for event in sse_data: - body += f"data: {json.dumps(event)}\n\n" - self.send_response(200) - self.send_header("Content-Type", "text/event-stream") - self.end_headers() - self.wfile.write(body.encode()) - elif path in self.responses: - resp_body = json.dumps(self.responses[path]).encode() - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(resp_body) - else: - self.send_response(404) - self.end_headers() - - def log_message(self, format, *args): - pass # Suppress log output - - class TestServerProxyWithMockServer(unittest.TestCase): """Integration tests with a real HTTP server.""" diff --git a/Tools/requirements.txt b/Tools/requirements.txt index 4a3118d9..347b6db3 100644 --- a/Tools/requirements.txt +++ b/Tools/requirements.txt @@ -7,3 +7,4 @@ pyserial pygdbmi watchdog tree_sitter +zeroconf>=0.131