diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f8bf6fa..5fe9c5f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,45 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). --- +## v26.06.113 (2026-06-17) + +### Added + +- **Server-layer observability.** Observability is no longer only application-layer + (the `http_server_requests_seconds` filter, tracing/correlation, process metrics): + pyfly now emits metrics about the ASGI **server itself** across Uvicorn, Granian, + and Hypercorn. A pure-ASGI `ServerMetricsASGIMiddleware` (the uniform primary + source, running in every worker) emits `server_active_connections`, + `server_in_flight_requests`, and `server_requests_total`; a `ServerMetricsBinder` + bound from the in-worker ASGI lifespan emits `server_workers`, + `server_uptime_seconds`, and `server_started_total` / `server_stopped_total`; and + a best-effort `ServerStatsPort` surfaces Uvicorn's true socket count + (`server_native_connections`) on the in-process `serve_async` path. Every meter is + labeled `server` and `worker_pid`. +- **Correct multi-worker aggregation.** With `workers > 1`, `pyfly run` enables + `prometheus_client` multiprocess mode (sets `PROMETHEUS_MULTIPROC_DIR` before + forking), so a single `/actuator/prometheus` scrape aggregates across all workers + via `MultiProcessCollector` — this also fixes the previous per-worker gap for + `http_server_requests_*`. +- **Live admin Observability dashboard.** A new real-time **Observability** view + (under Monitoring) shows server workers, uptime, active connections, in-flight + requests, requests/sec, a per-worker breakdown, and worker lifecycle, with links + to the Metrics and Traces views. Backed by `GET /admin/api/observability` and the + `observability` SSE stream. +- **Configuration.** New `pyfly.server.observability.*` keys — `enabled` + (default `true`, activated by the web/core starters), `sample-interval-seconds` + (`5.0`), and `access-log` (`false`, opt-in). Requires the observability extra + (`prometheus_client`); degrades to a no-op without it. +- **Local observability stack.** `docker-compose.yml` gained loopback-bound + Prometheus + Grafana services (config in `ops/prometheus/prometheus.yml`) that + scrape `/actuator/prometheus`. + +> Scope: gunicorn is intentionally not added (the stack stays async-only ASGI: +> Granian > Uvicorn > Hypercorn), but the `ServerStatsPort` + multiprocess design is +> gunicorn-ready for a future adapter. + +--- + ## v26.06.112 (2026-06-16) ### Changed diff --git a/README.md b/README.md index 211c6ed1..bf94be03 100644 --- a/README.md +++ b/README.md @@ -1057,7 +1057,7 @@ PyFly ships with **39 fully-implemented modules** organized into five layers — | Module | Description | Firefly Java Equivalent | |--------|-------------|------------------------| | **AOP** | Aspect-oriented programming | Spring AOP | -| **Observability** | Prometheus metrics, OpenTelemetry tracing | `fireflyframework-observability` | +| **Observability** | Prometheus metrics, OpenTelemetry tracing, server-layer metrics (workers, connections, in-flight requests, uptime) across Uvicorn/Granian/Hypercorn with multi-worker Prometheus aggregation, surfaced in a live admin Observability dashboard | `fireflyframework-observability` | | **Actuator** | Health checks, monitoring endpoints | `fireflyframework-starter-core` (actuator) | | **Admin** | Embedded management dashboard with 15 views, SSE streams, server mode fleet monitoring | Spring Boot Admin | | **Testing** | Test fixtures and assertions | Spring Test | diff --git a/ROADMAP.md b/ROADMAP.md index ccb83b69..51e416d4 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -34,6 +34,16 @@ This wave replaced mock-only adapter coverage with provable real-backend correct - *Plugins:* `PluginState` lifecycle model (LOADED/STARTED/STOPPED/FAILED), per-plugin start/stop with dependency cascade, typed `PluginException` hierarchy. - *Rule Engine:* Rich operator set (`between`, `contains`, `starts_with`, `ends_with`, `exists`, `is_null`, `is_empty`); fluent builder DSL (`pyfly.rule_engine.builder`); `RuleSetLoader.from_json`; `RuleSetValidator`; hexagonal `RuleEnginePort` + `ActionHandler` SPI; `RuleEngineService` facade; `EvaluationMode.ALL`/`FIRST_MATCH`; pluggable action handlers. +### Server-layer observability ✅ **Delivered (v26.06.113)** + +Observability was previously application-layer only (the `MetricsFilter` `http_server_requests_seconds`, tracing/correlation filters, `process_metrics`). This release adds metrics about the ASGI **server** itself (uvicorn, granian, hypercorn), written to the Prometheus registry and auto-exposed at `/actuator/prometheus` and `/actuator/metrics`. Every meter is labeled `server` (server type) and `worker_pid`. + +Three cooperating mechanisms supply the data. A pure-ASGI `ServerMetricsASGIMiddleware` wraps the app at the outermost layer and is the primary source — it runs in every worker for every server and worker count, emitting `server_active_connections`, `server_in_flight_requests`, and `server_requests_total`. A `ServerMetricsBinder`, started from the in-worker ASGI lifespan, emits `server_workers`, `server_uptime_seconds` (since this worker bound, distinct from `process_uptime_seconds`), `server_started_total`, `server_stopped_total`, and optionally `server_native_connections`. A best-effort `ServerStatsPort` lets each adapter enrich the data on the in-process `serve_async` path — the uvicorn adapter surfaces its true socket connection count (incl. idle keep-alive) and total requests via `server_native_connections`; granian/hypercorn report workers + uptime only. + +Multi-worker scrapes aggregate: `pyfly run` sets `PROMETHEUS_MULTIPROC_DIR` before forking workers, and `/actuator/prometheus` merges all workers via `MultiProcessCollector`, so the `server_*` and `http_server_requests_*` meters reflect every worker (custom Python collectors such as `process_*`/`system_*` are not aggregated by multiprocess mode). + +New config keys: `pyfly.server.observability.enabled` (default `true`; enabled by the web and core starters), `pyfly.server.observability.sample-interval-seconds` (default `5.0`), and `pyfly.server.observability.access-log` (default `false`). Requires the observability extra (`prometheus_client`); degrades to a no-op without it. The admin dashboard gains a live **Observability** section under Monitoring — stat cards, rolling charts, and a per-worker breakdown table — backed by `GET /admin/api/observability` and SSE `/admin/api/sse/observability`. gunicorn is not added in this release (the stack stays async-only ASGI: granian > uvicorn > hypercorn), but the `ServerStatsPort` + multiprocess design is gunicorn-ready. The local `docker-compose.yml` gained prometheus + grafana services scraping `/actuator/prometheus`. + --- ## Phase 1 — Core Distributed Patterns ✅ **Complete (v26.05.01)** diff --git a/book/manuscript-es/15-observability.md b/book/manuscript-es/15-observability.md index 92d97458..a664ac8d 100644 --- a/book/manuscript-es/15-observability.md +++ b/book/manuscript-es/15-observability.md @@ -496,6 +496,114 @@ Apunta tus `scrape_configs` de Prometheus a `/actuator/prometheus` y todas las m --- +## Observabilidad de la capa de servidor + +!!! note "Novedad en v26.6.113" + Hasta ahora toda la observabilidad de este capítulo ha sido de la **capa de + aplicación**: el `MetricsFilter` que mide `http_server_requests_seconds`, los + filtros de trazas/correlación y `process_metrics`. Esta versión añade métricas + sobre el **servidor en sí** —el servidor ASGI que ejecuta tu aplicación + (uvicorn, granian o hypercorn)— para que puedas ver conexiones, peticiones en + curso, workers y tiempo de actividad junto a las métricas de negocio. + +Todas estas métricas se escriben en el mismo registro de Prometheus y se exponen +automáticamente en `/actuator/prometheus`, sin código adicional. Tres mecanismos +cooperan para producirlas: + +1. **Un middleware ASGI puro** (`ServerMetricsASGIMiddleware`, en + `pyfly/web/adapters/starlette/asgi_server_metrics.py`) envuelve la aplicación en + la **capa más externa** y es la fuente **primaria**: corre en cada worker, para + cada servidor y cualquier número de workers. Emite `server_active_connections`, + `server_in_flight_requests` y `server_requests_total`. +2. **Un `ServerMetricsBinder`** (`pyfly/observability/server_metrics.py`), arrancado + desde el lifespan ASGI dentro del worker (junto a `register_process_metrics` y el + `ManagementServer`), emite `server_workers` (a partir de la variable de entorno + `_PYFLY_WORKERS` que fija `pyfly run`), `server_uptime_seconds` (desde que el + worker se vinculó al socket), `server_started_total`, `server_stopped_total` y, + opcionalmente, `server_native_connections`. +3. **Un `ServerStatsPort` de mejor esfuerzo** (`pyfly/server/ports/server_stats.py`) + implementado por cada adaptador: en la ruta en proceso `serve_async`, el + adaptador de uvicorn aflora su recuento real de conexiones de socket y el total + de peticiones desde `uvicorn.Server.server_state`; granian y hypercorn solo + reportan workers y tiempo de actividad (runtime en Rust / sin handle), de modo que + ahí los campos de conexión son `None`. + +!!! note "¿Por qué no leer simplemente las estadísticas nativas del servidor?" + En la ruta de producción `pyfly run`, `uvicorn.run(workers=N)` bifurca + subprocesos worker que construyen cada uno su **propio** servidor; el bean del + adaptador del worker no es el objeto que está sirviendo, así que `server_state` + queda inalcanzable entre procesos. Por eso el middleware ASGI —que sí corre + dentro del worker— es la fuente primaria uniforme; las estadísticas nativas son + un enriquecimiento de mejor esfuerzo. + +### Catálogo de métricas + +Todos los nombres son nombres de Prometheus y cada medidor lleva las etiquetas +`server` (el tipo de servidor) y `worker_pid`: + +| Métrica | Tipo | Qué mide | +|---|---|---| +| `server_active_connections` | gauge | Conexiones ASGI abiertas (http + websocket); aproximado, **no** sockets reales (los sockets keep-alive ociosos que retiene el servidor son invisibles a ASGI) | +| `server_in_flight_requests` | gauge | Peticiones http que se están atendiendo ahora mismo | +| `server_requests_total` | counter | Peticiones http completadas en la capa de servidor | +| `server_workers` | gauge | Procesos worker configurados | +| `server_uptime_seconds` | gauge | Segundos desde que este worker se vinculó al socket (distinto de `process_uptime_seconds`) | +| `server_started_total` / `server_stopped_total` | counter | Ciclo de vida del worker | +| `server_native_connections` | gauge | Recuento **real** de conexiones de socket de uvicorn, incluido keep-alive ocioso (solo en la ruta `serve_async`; ausente en granian/hypercorn) | + +### Modo multi-worker + +Cuando `workers > 1`, el **modo multiproceso de `prometheus_client`** se activa +automáticamente: `pyfly run` fija `PROMETHEUS_MULTIPROC_DIR` antes de bifurcar los +workers (`pyfly/observability/multiprocess.py`), cada worker escribe sus ficheros +mmap y `/actuator/prometheus` agrega todos los workers mediante +`MultiProcessCollector`. Así, una sola recolección refleja **todos** los workers. + +!!! warning "Limitación del modo multiproceso" + El modo multiproceso solo agrega valores de `Counter`, `Gauge`, `Histogram` y + `Summary`. Los colectores Python personalizados (las métricas `process_*` y + `system_*`) **no** se agregan entre workers. Los medidores `server_*` y + `http_server_requests_*` sí se agregan correctamente. + +### Configuración + +```yaml +pyfly: + server: + observability: + enabled: true # default; lo activan los starters web y core + sample-interval-seconds: 5.0 # default + access-log: false # default; logging de acceso nativo opt-in +``` + +`pyfly.server.observability.enabled` está activado por defecto por los starters web +y core, igual que `pyfly.observability.metrics.enabled`. Requiere el extra +`observability` (`prometheus_client`); sin él, degrada a no-op. + +### Exposición y panel + +Los medidores `server_*` aparecen en `/actuator/prometheus` y `/actuator/metrics`. +El panel de administración gana una nueva sección **Observability** en vivo (dentro +de Monitoring): tarjetas de estadística (workers, tiempo de actividad, conexiones +activas, peticiones en curso, peticiones/segundo), gráficos en movimiento, una tabla +de desglose por worker y enlaces a las vistas de Metrics y Traces. Está respaldada +por `GET /admin/api/observability` y el SSE `/admin/api/sse/observability`. + +!!! note "Alcance: solo ASGI por ahora" + Esta versión **no** añade gunicorn: la pila sigue siendo asíncrona, solo ASGI + (granian > uvicorn > hypercorn). Aun así, el diseño de `ServerStatsPort` y del + modo multiproceso está preparado para gunicorn de cara a un futuro adaptador. + +### Stack local — Prometheus y Grafana + +Para que puedas ver estas métricas en vivo, `docker-compose.yml` incorpora servicios +de **prometheus** y **grafana** que recolectan `/actuator/prometheus` (la +configuración vive en `ops/prometheus/prometheus.yml`). Levanta el stack, lanza unas +cuantas peticiones contra la API de negocio y observa cómo `server_in_flight_requests` +y `server_requests_total` se mueven en Grafana. + +--- + ## Trazas distribuidas ### @span — decorador de span de OpenTelemetry diff --git a/book/manuscript/15-observability.md b/book/manuscript/15-observability.md index 4d16a3ec..756cbcfd 100644 --- a/book/manuscript/15-observability.md +++ b/book/manuscript/15-observability.md @@ -488,6 +488,180 @@ Point your Prometheus `scrape_configs` at `/actuator/prometheus` and all `Metric --- +## Server-layer observability + +Everything you metered so far lives *inside* your application: `@timed` wraps a +handler, `@counted` increments on a method call, `MetricsFilter` records +`http_server_requests_seconds` as a request passes through the filter chain, and +`process_metrics` samples CPU and memory. All of that describes the work Lumen +does — but none of it describes the **server** running Lumen. How many +connections is uvicorn holding open right now? How many requests are in flight +*at the server layer*, before they ever reach a handler? How long has this worker +been bound, as distinct from the Python process uptime? Until now those numbers +were invisible. + +!!! note "New in v26.6.113" + Server-layer observability adds a family of `server_*` metrics about the + ASGI server itself — uvicorn, granian, or hypercorn — alongside the existing + application-layer meters. They are on by default (enabled by the web and core + starters, mirroring `pyfly.observability.metrics.enabled`), appear at + `/actuator/prometheus` with no extra wiring, and feed a new live + **Observability** view in the admin dashboard. + +### How it works — three cooperating sources + +You might expect PyFly to simply read the server's own statistics. On the +in-process `serve_async` path it does, but that path is not how you run in +production. `pyfly run --server uvicorn --workers 4` calls `uvicorn.run(workers=N)`, +which **forks worker subprocesses** — each builds its *own* server object in its +own process. The adapter bean your worker holds is not the object actually +serving traffic, so its `server_state` is unreachable across the process +boundary. PyFly therefore layers three mechanisms, each writing to the same +Prometheus registry that `/actuator/prometheus` already exposes: + +1. A pure-ASGI middleware (`ServerMetricsASGIMiddleware`) wraps the app at the + **outermost** layer. This is the **primary** source: it runs in every worker, + for every server type and worker count, and it sees every connection and + request before any handler does. It emits `server_active_connections`, + `server_in_flight_requests`, and `server_requests_total`. +2. A `ServerMetricsBinder`, started from the in-worker ASGI lifespan beside + `register_process_metrics` and the `ManagementServer`, emits `server_workers` + (read from the `_PYFLY_WORKERS` env var that `pyfly run` sets), + `server_uptime_seconds` (since this worker bound), `server_started_total`, + `server_stopped_total`, and optionally `server_native_connections`. +3. A best-effort `ServerStatsPort`, implemented per adapter. On the in-process + `serve_async` path the uvicorn adapter surfaces its *true* socket connection + count and total request count from `uvicorn.Server.server_state`. The granian + and hypercorn adapters report workers and uptime only — granian's Rust runtime + and hypercorn's lack of a server handle leave the connection fields `None` — + so native stats are enrichment, never the foundation. + +The middleware is the uniform floor; native stats are best-effort icing on top. +That split is why the numbers are always present, regardless of how many workers +you fork or which server you chose. + +### The metric catalog + +Every meter below carries two labels: `server` (the server type) and +`worker_pid` (the process that emitted it). Prometheus names follow the usual +exposition conventions. + +| Metric | Type | Meaning | +|---|---|---| +| `server_active_connections` | gauge | Open ASGI connections (HTTP + WebSocket). Approximate — **not** true sockets; idle keep-alive sockets the server holds are invisible to the ASGI layer. | +| `server_in_flight_requests` | gauge | HTTP requests currently being handled. | +| `server_requests_total` | counter | Completed HTTP requests at the server layer. | +| `server_workers` | gauge | Configured worker processes. | +| `server_uptime_seconds` | gauge | Seconds since this worker bound — distinct from `process_uptime_seconds`. | +| `server_started_total` | counter | Worker-start lifecycle events. | +| `server_stopped_total` | counter | Worker-stop lifecycle events. | +| `server_native_connections` | gauge | uvicorn's *true* socket connection count, including idle keep-alive. Present only on the `serve_async` path; absent for granian and hypercorn. | + +!!! note "Two kinds of connection count, two kinds of uptime" + `server_active_connections` counts what the ASGI layer can see; an idle HTTP + keep-alive socket holding no live connection is invisible to it, so this + gauge under-counts what the kernel sees. `server_native_connections`, when + available, is the server's own socket count and *does* include those idle + keep-alives — which is exactly why it usually reads higher. Likewise, + `server_uptime_seconds` measures how long *this worker* has been bound, while + `process_uptime_seconds` measures the Python process; after a worker respawn + they diverge. + +### Enabling it + +Server-layer observability is on by default, so you do not have to do anything to +get the `server_*` meters. The knobs let you tune the sample cadence, opt into +native access logging, or turn the whole subsystem off: + +```yaml +pyfly: + server: + observability: + enabled: true # default; mirrors pyfly.observability.metrics.enabled + sample-interval-seconds: 5.0 # how often the binder samples gauges + access-log: false # opt-in native server access logging +``` + +Like the rest of the metrics stack, it requires the `observability` extra +(`prometheus_client`); without it the subsystem degrades to a silent no-op rather +than failing startup. Remember that `server_*` is exposed on the **management +port** (`9090`) alongside the other actuator metrics — and that `prometheus` +must be in your `pyfly.management.endpoints.web.exposure.include` list for the +scrape endpoint to appear. + +**Run it — scrape the server metrics.** Start Lumen and scrape the management +port, filtering for the new family: + +```bash +uv run pyfly run --server uvicorn +curl -s localhost:9090/actuator/prometheus | grep '^server_' +``` + +You should see exposition lines like these (your PIDs and numbers will differ): + +``` +server_workers{server="uvicorn",worker_pid="48211"} 1.0 +server_uptime_seconds{server="uvicorn",worker_pid="48211"} 37.4 +server_active_connections{server="uvicorn",worker_pid="48211"} 2.0 +server_in_flight_requests{server="uvicorn",worker_pid="48211"} 1.0 +server_requests_total{server="uvicorn",worker_pid="48211"} 128.0 +server_started_total{server="uvicorn",worker_pid="48211"} 1.0 +server_native_connections{server="uvicorn",worker_pid="48211"} 5.0 +``` + +Notice `server_native_connections` reads higher than `server_active_connections` +here: the extra sockets are idle keep-alive connections the kernel holds that the +ASGI layer never sees. Drive a deposit on `8080` and re-scrape to watch +`server_requests_total` climb. Switch to `--server granian` and re-scrape: the +gauges and counters are still there, but `server_native_connections` is gone — +granian's Rust runtime offers no handle to read it. + +### Multi-worker aggregation + +With `--workers 1` a scrape is trivially complete: one worker, one set of +mmapped values. With `workers > 1` each forked worker would otherwise expose only +*its own* numbers on *its own* listener, and a single scrape would see just one +worker. PyFly closes that gap by auto-enabling `prometheus_client`'s multiprocess +mode. Before forking, `pyfly run` sets `PROMETHEUS_MULTIPROC_DIR` +(`pyfly/observability/multiprocess.py`); each worker writes its metrics to mmap +files in that directory; and `/actuator/prometheus` aggregates across every +worker through a `MultiProcessCollector`. The upshot: **one scrape reflects the +whole fleet of workers**, and the `worker_pid` label lets you still break the +numbers down per process. + +!!! warning "Custom collectors are not aggregated" + Multiprocess mode aggregates only `Counter`, `Gauge`, `Histogram`, and + `Summary` values. Custom Python collectors — the `process_*` and `system_*` + metrics — are *not* merged across workers, because the registry cannot + serialize arbitrary collector state through the mmap files. The `server_*` + and `http_server_requests_*` meters are ordinary counters and gauges, so they + aggregate correctly; treat the per-process CPU and memory figures as + single-worker samples when reading a multi-worker scrape. + +### The admin Observability view + +Server metrics are also surfaced in the dashboard, under **Monitoring**, in a new +live **Observability** view. It opens with stat cards for workers, uptime, active +connections, in-flight requests, and requests-per-second; below them sit rolling +charts of the same series, and a per-worker breakdown table keyed on `worker_pid` +so you can spot one hot or stalled worker in a fleet. The view links across to the +existing Metrics and Traces views for drill-down. It is backed by +`GET /admin/api/observability` for the initial snapshot and the SSE stream +`/admin/api/sse/observability` for live updates — the same push-not-poll model the +rest of the dashboard uses. + +!!! note "Scope: async-only, gunicorn-ready" + This release keeps the server stack async-only ASGI — granian, then uvicorn, + then hypercorn — and does **not** add gunicorn. The `ServerStatsPort` and the + multiprocess design are deliberately gunicorn-ready, though, so a future + sync-worker adapter can plug in without reworking the metric plumbing. For + local development, `docker-compose.yml` now ships Prometheus and Grafana + services that scrape `/actuator/prometheus` (config in + `ops/prometheus/prometheus.yml`), so you can watch the `server_*` series on a + dashboard end to end. + +--- + ## Distributed tracing ### @span — OpenTelemetry span decorator diff --git a/docker-compose.yml b/docker-compose.yml index 450a08d7..7a895710 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -55,3 +55,27 @@ services: KEYCLOAK_ADMIN: admin KEYCLOAK_ADMIN_PASSWORD: admin ports: ["8080:8080"] + # Observability stack — scrapes a locally-running PyFly app's + # /actuator/prometheus (server_* + http_server_requests_* meters) and renders + # them in Grafana. Start with: docker compose up -d prometheus grafana + # + # LOCAL-DEV ONLY: both services bind to 127.0.0.1 (loopback) so they are not + # reachable from other hosts. Prometheus has no auth; Grafana allows anonymous + # READ-ONLY (Viewer) access for convenience. For the Grafana admin login, set + # GRAFANA_ADMIN_PASSWORD in the environment (otherwise Grafana's default applies + # and must be changed on first login). Do NOT expose these ports publicly. + prometheus: + image: prom/prometheus:v2.54.1 + volumes: + - ./ops/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + extra_hosts: + - "host.docker.internal:host-gateway" # reach the app on the host (Linux) + ports: ["127.0.0.1:9091:9090"] + grafana: + image: grafana/grafana:11.2.0 + environment: + GF_AUTH_ANONYMOUS_ENABLED: "true" + GF_AUTH_ANONYMOUS_ORG_ROLE: Viewer + GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_ADMIN_PASSWORD:-} + ports: ["127.0.0.1:3000:3000"] + depends_on: [prometheus] diff --git a/docs/modules/admin.md b/docs/modules/admin.md index 6fdb07c6..256c8300 100644 --- a/docs/modules/admin.md +++ b/docs/modules/admin.md @@ -66,7 +66,7 @@ http://localhost:8080/admin That is all. The dashboard auto-discovers beans, health indicators, loggers, scheduled tasks, HTTP mappings, caches, CQRS handlers, transactions, and -metrics from the running `ApplicationContext` and presents them in 15 built-in +metrics from the running `ApplicationContext` and presents them in 16 built-in views with real-time updates. --- @@ -212,7 +212,7 @@ pyfly: ## Built-in Views -The dashboard ships with 15 views organized into four sections in the sidebar. +The dashboard ships with 16 views organized into four sections in the sidebar. Each view has a corresponding REST API endpoint and (where applicable) an SSE stream for live updates. @@ -237,6 +237,7 @@ stream for live updates. | View | Sidebar ID | Description | |------|-----------|-------------| | **Metrics** | `metrics` | Built-in process metrics (CPU, memory, threads, GC, uptime) always available without external dependencies. Optional Prometheus metrics included when `prometheus_client` is installed. Selecting a numeric metric opens a **live time-series trend** — a rolling chart polled at the configured refresh interval with a Value / Rate (Δ/s) toggle, pause/resume, Current/Min/Max/Avg summary, a measurement selector for multi-series (tagged) metrics, and a live-refreshing measurements table. Non-numeric metrics show a snapshot instead. | +| **Observability** | `observability` | Live server-layer observability for the ASGI server (uvicorn, granian, hypercorn). Stat cards for workers, uptime, active connections, in-flight requests, and requests/sec, plus rolling charts and a per-worker breakdown table. Shows worker lifecycle (started/stopped) and links across to the Metrics and Traces views. Backed by the `server_*` meters exposed at `/actuator/prometheus`. Real-time SSE updates. | | **Scheduled Tasks** | `scheduled` | All `@scheduled` tasks with cron expressions, fixed-rate/delay configuration, and execution status. | | **HTTP Traces** | `traces` | Recent HTTP request/response traces captured by `TraceCollectorFilter`. **Live request analytics**: total requests, average/max latency, error rate (4xx+5xx), a status-code mix bar (2xx/3xx/4xx/5xx), a latency-distribution histogram and latency percentiles (p50/p90/p95/p99) — all updating live as requests arrive. Shows method, path, status code, duration, query string, client host, content type, user agent, and response content-length. Click-to-detail panel. Status code filter pills (All, 2xx, 3xx, 4xx, 5xx). Real-time SSE for new traces. Client buffer bounded to the 500-entry ring buffer. | @@ -270,6 +271,7 @@ following SSE endpoints are available: | `GET /admin/api/sse/traces` | `trace` | Emits individual new HTTP trace events as they are captured by the `TraceCollectorFilter`. Polled every 2 seconds. | | `GET /admin/api/sse/logfile` | `log` | Emits new log records captured by the `AdminLogHandler`. Uses incremental polling (records with id > last seen) every 1 second. Each event contains `id`, `timestamp`, `level`, `logger`, `message`, `context`, and `thread`. | | `GET /admin/api/sse/beans` | `beans` | Emits the bean registry snapshot at each interval (poll interval is `refresh_interval / 1000` seconds) so the Beans view stays live. | +| `GET /admin/api/sse/observability` | `observability` | Emits a server-layer observability snapshot (workers, uptime, active connections, in-flight requests, requests/sec, per-worker breakdown) at each interval so the Observability view stays live. | Each SSE stream sends JSON payloads in the standard `data:` format with appropriate `Cache-Control: no-cache` and `X-Accel-Buffering: no` headers for @@ -306,6 +308,7 @@ All API endpoints return JSON responses. The base path defaults to `/admin/api` | `POST` | `/admin/api/loggers/{name}` | Set logger level. Body: `{"level": "DEBUG"}`. Valid levels: `TRACE`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`, `OFF` (an unknown level returns HTTP 400). `TRACE` and `OFF` are applied as numeric level 5 and a disabled logger respectively. | | `GET` | `/admin/api/metrics` | List metric names. | | `GET` | `/admin/api/metrics/{name}` | Metric detail by name. | +| `GET` | `/admin/api/observability` | Server-layer observability snapshot: workers, uptime, active connections, in-flight requests, requests/sec, and per-worker breakdown. | | `GET` | `/admin/api/scheduled` | List scheduled tasks. | | `GET` | `/admin/api/mappings` | List HTTP route mappings. | | `GET` | `/admin/api/caches` | Cache stats: adapter type, entry count, key list. | @@ -328,6 +331,7 @@ All API endpoints return JSON responses. The base path defaults to `/admin/api` | `GET` | `/admin/api/sse/traces` | Real-time HTTP trace stream. | | `GET` | `/admin/api/sse/logfile` | Real-time log record stream. | | `GET` | `/admin/api/sse/beans` | Real-time bean registry stream. | +| `GET` | `/admin/api/sse/observability` | Real-time server-layer observability stream. | ### Instance Registry Endpoints (Server Mode) diff --git a/docs/modules/observability.md b/docs/modules/observability.md index 670181bd..3e6d487a 100644 --- a/docs/modules/observability.md +++ b/docs/modules/observability.md @@ -17,24 +17,30 @@ logging -- along with a health check system for readiness and liveness probes. - [@timed Decorator](#timed-decorator) - [@counted Decorator](#counted-decorator) - [Prometheus Integration](#prometheus-integration) -3. [Tracing](#tracing) +3. [Server-layer metrics](#server-layer-metrics) + - [Metric Catalog](#metric-catalog) + - [How It Works](#how-it-works) + - [Multi-Worker and Multiprocess Mode](#multi-worker-and-multiprocess-mode) + - [Configuration](#server-observability-configuration) + - [Exposition and Dashboard](#exposition-and-dashboard) +4. [Tracing](#tracing) - [@span Decorator](#span-decorator) - [Error Recording](#error-recording) - [OpenTelemetry Integration](#opentelemetry-integration) - [Distributed Trace Propagation](#distributed-trace-propagation) -4. [Logging](#logging) +5. [Logging](#logging) - [Quick Start with get_logger](#quick-start-with-get_logger) - [LoggingPort Protocol](#loggingport-protocol) - [StructlogAdapter](#structlogadapter) - [Structured Logging with Key-Value Pairs](#structured-logging-with-key-value-pairs) - [Correlation IDs](#correlation-ids) -5. [Health Checks](#health-checks) +6. [Health Checks](#health-checks) - [HealthAggregator](#healthaggregator) -6. [Auto-Configuration](#auto-configuration) -7. [Configuration](#configuration) +7. [Auto-Configuration](#auto-configuration) +8. [Configuration](#configuration) - [Logging Settings](#logging-settings) - [Metrics and Actuator Settings](#metrics-and-actuator-settings) -8. [Complete Example](#complete-example) +9. [Complete Example](#complete-example) --- @@ -317,6 +323,115 @@ work without modification. --- +## Server-layer metrics + +The metrics above are **application-layer** — the `http_server_requests_seconds` +timer emitted by `MetricsFilter`, the tracing/correlation filters, and the +`process_metrics`. As of **v26.06.113**, PyFly also emits **server-layer** metrics +about the ASGI server itself (uvicorn, granian, hypercorn), so you can see open +connections, in-flight requests, worker count, and uptime independently of any +single request. All server meters are written to the same Prometheus registry and +auto-exposed at `/actuator/prometheus`. See the [Server Module Guide](server.md) +for the server adapters these metrics describe. + +### Metric Catalog + +Every server meter is labeled `server` (the server type) and `worker_pid`. + +| Metric | Type | Meaning | +|------------------------------|---------|------------------------------------------------------------------------------------------------------------------| +| `server_active_connections` | gauge | Open ASGI connections (http + websocket). Approximate — **not** true sockets, since idle keep-alive sockets the server holds are invisible to ASGI. | +| `server_in_flight_requests` | gauge | HTTP requests currently being handled. | +| `server_requests_total` | counter | Completed HTTP requests at the server layer. | +| `server_workers` | gauge | Configured worker processes. | +| `server_uptime_seconds` | gauge | Seconds since this worker bound (distinct from `process_uptime_seconds`). | +| `server_started_total` | counter | Worker started (lifecycle). | +| `server_stopped_total` | counter | Worker stopped (lifecycle). | +| `server_native_connections` | gauge | uvicorn's **true** socket connection count, including idle keep-alive. `serve_async` path only; absent for granian/hypercorn. | + +### How It Works + +Three cooperating mechanisms write these meters to the Prometheus registry: + +1. **ASGI middleware (primary source).** `ServerMetricsASGIMiddleware` is a pure-ASGI + middleware that wraps the app at the **outermost** layer. It runs in every worker, + for every server and worker count, and emits `server_active_connections`, + `server_in_flight_requests`, and `server_requests_total`. + **Source:** `src/pyfly/web/adapters/starlette/asgi_server_metrics.py` + +2. **ServerMetricsBinder.** Started from the in-worker ASGI lifespan (beside + `register_process_metrics` / `ManagementServer`), it emits `server_workers` (from + the `_PYFLY_WORKERS` env set by `pyfly run`), `server_uptime_seconds`, + `server_started_total`, `server_stopped_total`, and optionally + `server_native_connections`. + **Source:** `src/pyfly/observability/server_metrics.py` + +3. **ServerStatsPort (best-effort).** Implemented by each adapter. On the in-process + `serve_async` path, the uvicorn adapter surfaces its true socket connection count + and total requests from `uvicorn.Server.server_state`; granian and hypercorn report + workers and uptime only (Rust runtime / no handle), so their connection fields are + `None`. + **Source:** `src/pyfly/server/ports/server_stats.py` + +The ASGI middleware is the uniform primary source because of how production runs +work: on the `pyfly run` path, `uvicorn.run(workers=N)` forks worker subprocesses +that each build their **own** server, so the worker's adapter bean is not the object +actually serving and `server_state` is unreachable cross-process. Native stats are +therefore best-effort enrichment layered on top of the in-worker middleware. + +### Multi-Worker and Multiprocess Mode + +When `workers` is greater than 1, `prometheus_client` multiprocess mode is +auto-enabled: `pyfly run` sets `PROMETHEUS_MULTIPROC_DIR` before forking workers, +each worker writes mmap files, and `/actuator/prometheus` aggregates across all +workers via `MultiProcessCollector`. A single scrape therefore reflects **all** +workers. + +> **Limitation:** Multiprocess mode aggregates only `Counter`/`Gauge`/`Histogram`/`Summary` +> values. Custom Python collectors (the `process_*` / `system_*` metrics) are **not** +> aggregated. The `server_*` and `http_server_requests_*` meters aggregate correctly. + +**Source:** `src/pyfly/observability/multiprocess.py` + +### Server Observability Configuration + +Server-layer observability requires the observability extra (`prometheus_client`) +and degrades to a no-op without it. + +```yaml +pyfly: + server: + observability: + enabled: true # Default: true (enabled by the web and core + # starters; mirrors pyfly.observability.metrics.enabled) + sample-interval-seconds: 5.0 # Default: 5.0 + access-log: false # Default: false; opt-in native server access logging +``` + +| Config Key | Default | Description | +|----------------------------------------------------|---------|-----------------------------------------------------------------------------| +| `pyfly.server.observability.enabled` | `true` | Enable server-layer metrics. Enabled by the web and core starters; mirrors `pyfly.observability.metrics.enabled`. | +| `pyfly.server.observability.sample-interval-seconds` | `5.0` | Sampling interval for the binder. | +| `pyfly.server.observability.access-log` | `false` | Opt-in native server access logging. | + +### Exposition and Dashboard + +The `server_*` meters appear at `/actuator/prometheus` and `/actuator/metrics`. The +admin dashboard also gains a live **Observability** view (under Monitoring) with stat +cards (workers, uptime, active connections, in-flight requests, requests/sec), rolling +charts, a per-worker breakdown table, and links to the Metrics and Traces views. It is +backed by `GET /admin/api/observability` and the SSE stream +`/admin/api/sse/observability`. + +The local stack ships Prometheus and Grafana services (in `docker-compose.yml`) +scraping `/actuator/prometheus`; the scrape config lives in `ops/prometheus/prometheus.yml`. + +> **Scope note:** gunicorn is **not** added in this release — the stack stays +> async-only ASGI (granian > uvicorn > hypercorn). The `ServerStatsPort` and +> multiprocess design are gunicorn-ready for a future adapter. + +--- + ## Tracing ### @span Decorator diff --git a/docs/modules/server.md b/docs/modules/server.md index eba691a8..34eee0e0 100644 --- a/docs/modules/server.md +++ b/docs/modules/server.md @@ -22,6 +22,7 @@ The PyFly server module provides a pluggable ASGI server and event loop abstract - [Configuration Reference](#configuration-reference) - [ServerProperties](#serverproperties) - [Full YAML Reference](#full-yaml-reference) +- [Server Observability](#server-observability) - [CLI: pyfly run](#cli-pyfly-run) - [Custom Server Adapter](#custom-server-adapter) - [Spring Boot Comparison](#spring-boot-comparison) @@ -346,6 +347,37 @@ pyfly: --- +## Server Observability + +Beyond the application-layer metrics (`http_server_requests_seconds`, tracing/correlation, process metrics), the server adapters emit `server_*` meters describing the ASGI server itself. They are written to the Prometheus registry and auto-exposed at `/actuator/prometheus` (and `/actuator/metrics`). Every meter is labeled `server` (server type) and `worker_pid`. + +The catalog covers connection and request activity (`server_active_connections`, `server_in_flight_requests`, `server_requests_total`), worker lifecycle and uptime (`server_workers`, `server_uptime_seconds`, `server_started_total`, `server_stopped_total`), and, on the Uvicorn in-process serve path only, true socket counts including idle keep-alive (`server_native_connections`; absent for Granian/Hypercorn). The primary source is a pure-ASGI middleware that runs in every worker for every server, so the meters are uniform across the stack. See the [Observability module guide](observability.md) for the full catalog, label semantics, and exposition details. + +### Configuration + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `enabled` | `bool` | `true` | Enable server-layer metrics (mirrors `pyfly.observability.metrics.enabled`; on with the web and core starters) | +| `sample-interval-seconds` | `float` | `5.0` | Interval at which gauges are sampled | +| `access-log` | `bool` | `false` | Opt-in native server access logging | + +```yaml +pyfly: + server: + observability: + enabled: true # mirrors pyfly.observability.metrics.enabled + sample-interval-seconds: 5.0 + access-log: false # opt-in native access logging +``` + +Server observability requires the observability extra (`prometheus_client`); without it, it degrades to a no-op. + +### Multi-worker aggregation + +When `workers > 1`, `pyfly run` enables `prometheus_client` multiprocess mode (setting `PROMETHEUS_MULTIPROC_DIR` before forking workers). Each worker writes its own mmap files, and `/actuator/prometheus` aggregates across all workers, so a single scrape reflects every worker. The `server_*` and `http_server_requests_*` meters aggregate correctly; note that custom Python collectors (the `process_*`/`system_*` metrics) are not aggregated by multiprocess mode. + +--- + ## CLI: pyfly run The `pyfly run` command accepts server-related flags that override the YAML configuration: diff --git a/ops/prometheus/prometheus.yml b/ops/prometheus/prometheus.yml new file mode 100644 index 00000000..aef38919 --- /dev/null +++ b/ops/prometheus/prometheus.yml @@ -0,0 +1,25 @@ +# Prometheus scrape config for a locally-running PyFly app. +# +# Scrapes the framework's Micrometer/Prometheus exposition at +# /actuator/prometheus — including the server-layer meters (server_workers, +# server_active_connections, server_in_flight_requests, server_uptime_seconds, +# server_requests_total) and the http_server_requests_seconds family. +# +# By default it targets the management port (9090) where actuator lives when +# management.server.port is separated (the pyfly default); change to 8080 if you +# serve actuator on the application port (shared mode). +# +# Under multi-worker (workers > 1) the framework enables prometheus_client +# multiprocess mode, so a single scrape already aggregates across all workers. +global: + scrape_interval: 5s + evaluation_interval: 5s + +scrape_configs: + - job_name: pyfly + metrics_path: /actuator/prometheus + static_configs: + # host.docker.internal lets the container reach the app on the host. + - targets: ["host.docker.internal:9090"] + labels: + application: pyfly-app diff --git a/pyproject.toml b/pyproject.toml index 2fa80297..f9badac7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pyfly" # CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4); # git tag, GitHub release and human-readable display use leading-zero form # (v26.05.04) to match the Java/.NET/Go siblings. -version = "26.6.112" +version = "26.6.113" description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more." readme = "README.md" license = "Apache-2.0" diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py index b7fe128d..50d2c7d5 100644 --- a/src/pyfly/__init__.py +++ b/src/pyfly/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. """PyFly — Enterprise Python Framework.""" -__version__ = "26.06.112" +__version__ = "26.06.113" diff --git a/src/pyfly/actuator/endpoints/prometheus_endpoint.py b/src/pyfly/actuator/endpoints/prometheus_endpoint.py index b2aed60d..506b400d 100644 --- a/src/pyfly/actuator/endpoints/prometheus_endpoint.py +++ b/src/pyfly/actuator/endpoints/prometheus_endpoint.py @@ -54,7 +54,12 @@ async def handle(self, context: Any = None) -> dict[str, Any]: "body": "# prometheus_client is not installed\n", "status": 503, } - output = generate_latest().decode("utf-8") + from pyfly.observability.multiprocess import collect_registry + + # Under multiprocess mode (workers > 1) aggregate every worker's + # mmap-backed metrics; otherwise scrape the process default registry. + # collect_registry() falls back gracefully if the multiproc dir is missing. + output = generate_latest(collect_registry()).decode("utf-8") return { "content_type": _CONTENT_TYPE, "body": output, diff --git a/src/pyfly/admin/adapters/starlette.py b/src/pyfly/admin/adapters/starlette.py index 07fe7eaf..dfefea7a 100644 --- a/src/pyfly/admin/adapters/starlette.py +++ b/src/pyfly/admin/adapters/starlette.py @@ -39,6 +39,7 @@ from pyfly.admin.providers.loggers_provider import LoggersProvider from pyfly.admin.providers.mappings_provider import MappingsProvider from pyfly.admin.providers.metrics_provider import MetricsProvider + from pyfly.admin.providers.observability_provider import ObservabilityProvider from pyfly.admin.providers.overview_provider import OverviewProvider from pyfly.admin.providers.runtime_provider import RuntimeProvider from pyfly.admin.providers.scheduled_provider import ScheduledProvider @@ -92,6 +93,7 @@ def __init__( logfile: LogfileProvider | None = None, runtime: RuntimeProvider | None = None, server: ServerProvider | None = None, + observability: ObservabilityProvider | None = None, instance_registry: InstanceRegistry | None = None, ) -> None: self._props = properties @@ -113,6 +115,7 @@ def __init__( self._logfile = logfile self._runtime = runtime self._server = server + self._observability = observability self._instance_registry = instance_registry def _auth_failure(self) -> JSONResponse | None: @@ -195,6 +198,7 @@ def build_routes(self) -> list[Route | Mount]: (f"{api}/logfile/clear", self._handle_logfile_clear, ["POST"]), (f"{api}/runtime", self._handle_runtime, ["GET"]), (f"{api}/server", self._handle_server, ["GET"]), + (f"{api}/observability", self._handle_observability, ["GET"]), (f"{api}/views", self._handle_views, ["GET"]), (f"{api}/settings", self._handle_settings, ["GET"]), (f"{api}/sse/health", self._handle_sse_health, ["GET"]), @@ -203,6 +207,7 @@ def build_routes(self) -> list[Route | Mount]: (f"{api}/sse/logfile", self._handle_sse_logfile, ["GET"]), (f"{api}/sse/runtime", self._handle_sse_runtime, ["GET"]), (f"{api}/sse/server", self._handle_sse_server, ["GET"]), + (f"{api}/sse/observability", self._handle_sse_observability, ["GET"]), (f"{api}/sse/beans", self._handle_sse_beans, ["GET"]), ] @@ -355,6 +360,11 @@ async def _handle_server(self, request: Request) -> JSONResponse: return JSONResponse({"available": False}) return JSONResponse(await self._server.get_server_info()) + async def _handle_observability(self, request: Request) -> JSONResponse: + if self._observability is None: + return JSONResponse({"available": False}) + return JSONResponse(await self._observability.get_observability()) + async def _handle_views(self, request: Request) -> JSONResponse: extensions = self._view_registry.get_extensions() views = [{"id": ext.view_id, "name": ext.display_name, "icon": ext.icon} for ext in extensions.values()] @@ -437,6 +447,14 @@ async def _handle_sse_server(self, request: Request) -> Response: interval = self._props.refresh_interval / 1000 return make_sse_response(server_stream(self._server, interval)) + async def _handle_sse_observability(self, request: Request) -> Response: + from pyfly.admin.api.sse import make_sse_response, observability_stream + + if self._observability is None: + return JSONResponse({"available": False}) + interval = self._props.refresh_interval / 1000 + return make_sse_response(observability_stream(self._observability, interval)) + async def _handle_sse_beans(self, request: Request) -> Response: from pyfly.admin.api.sse import beans_stream, make_sse_response diff --git a/src/pyfly/admin/api/sse.py b/src/pyfly/admin/api/sse.py index c342f257..4f2474c3 100644 --- a/src/pyfly/admin/api/sse.py +++ b/src/pyfly/admin/api/sse.py @@ -29,6 +29,7 @@ from pyfly.admin.providers.beans_provider import BeansProvider from pyfly.admin.providers.health_provider import HealthProvider from pyfly.admin.providers.metrics_provider import MetricsProvider + from pyfly.admin.providers.observability_provider import ObservabilityProvider from pyfly.admin.providers.runtime_provider import RuntimeProvider from pyfly.admin.providers.server_provider import ServerProvider @@ -139,6 +140,37 @@ async def server_stream( _logger.debug("sse_stream_closed", extra={"stream": "server"}) +async def observability_stream( + observability_provider: ObservabilityProvider, + interval: float = 5.0, +) -> AsyncGenerator[str, None]: + # requests/second is computed PER STREAM from successive snapshots, so it is + # not corrupted by sharing the provider across the REST endpoint and multiple + # concurrent SSE consumers. + prev_total: float | None = None + prev_ts: float | None = None + try: + while True: + data = await observability_provider.get_observability() + total = data.get("requests_total") + ts = data.get("timestamp") + if ( + prev_total is not None + and prev_ts is not None + and total is not None + and ts is not None + and ts > prev_ts + and total >= prev_total + ): + data["requests_per_second"] = round((total - prev_total) / (ts - prev_ts), 3) + if total is not None and ts is not None: + prev_total, prev_ts = total, ts + yield _sse_event(data, event="observability") + await asyncio.sleep(interval) + except asyncio.CancelledError: + _logger.debug("sse_stream_closed", extra={"stream": "observability"}) + + async def beans_stream( beans_provider: BeansProvider, interval: float = 10.0, diff --git a/src/pyfly/admin/providers/observability_provider.py b/src/pyfly/admin/providers/observability_provider.py new file mode 100644 index 00000000..d30db7d4 --- /dev/null +++ b/src/pyfly/admin/providers/observability_provider.py @@ -0,0 +1,197 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Observability data provider — live server-layer metrics for the admin view. + +Sources the ``server_*`` meters (and the ASGI server-metrics gauges) the +framework emits into the Prometheus registry, plus static ``ServerInfo``. Under +prometheus_client multiprocess mode it reads the aggregating registry so the +numbers reflect ALL workers, not just the one answering the admin request. + +The snapshot shape (REST ``GET /admin/api/observability`` and the SSE +``observability`` event) is server-observability-specific: identity, aggregate +gauges, a per-worker breakdown (keyed by the ``worker_pid`` label), and a derived +requests/second. +""" + +from __future__ import annotations + +import os +import time +from typing import Any + +# server_* sample names this view surfaces. +_ACTIVE = "server_active_connections" +_IN_FLIGHT = "server_in_flight_requests" +_REQUESTS = "server_requests_total" +_WORKERS = "server_workers" +_UPTIME = "server_uptime_seconds" +_STARTED = "server_started_total" +_STOPPED = "server_stopped_total" +_NATIVE_CONNS = "server_native_connections" + + +class ObservabilityProvider: + """Provides live server-layer observability for the admin dashboard.""" + + def __init__(self, context: Any = None) -> None: + self._context = context + + # -- server identity (static ServerInfo, like ServerProvider) ----------- + + def _resolve_server(self) -> Any: + if self._context is None: + return None + try: + from pyfly.server.ports.outbound import ApplicationServerPort + except ImportError: + return None + for _cls, reg in self._context.container._registrations.items(): + if reg.instance is not None and isinstance(reg.instance, ApplicationServerPort): + return reg.instance + return None + + def _server_identity(self) -> dict[str, Any]: + server = self._resolve_server() + env_type = os.environ.get("_PYFLY_SERVER_TYPE") + if server is None: + return { + "name": env_type or "unknown", + "version": "unknown", + "event_loop": os.environ.get("_PYFLY_EVENT_LOOP", "unknown"), + "http": os.environ.get("_PYFLY_HTTP", "unknown"), + "host": os.environ.get("_PYFLY_SERVER_HOST", "unknown"), + "port": int(os.environ.get("_PYFLY_SERVER_PORT", "0") or 0), + } + info = server.server_info + return { + "name": info.name, + "version": info.version, + "event_loop": info.event_loop, + "http": info.http_protocol, + "host": info.host, + "port": info.port, + } + + # -- metric reading (multiprocess-aware) -------------------------------- + + @staticmethod + def _collect_server_samples() -> tuple[list[Any], bool]: + """Return (samples, multiprocess) for ``server_*`` metric families.""" + try: + import prometheus_client # noqa: F401 - availability probe + except ImportError: + return [], False + + from pyfly.observability.multiprocess import collect_registry, is_multiprocess + + multiprocess = is_multiprocess() + registry = collect_registry() + samples: list[Any] = [] + for metric in registry.collect(): + if not metric.name.startswith("server"): + continue + for sample in metric.samples: + if sample.name.endswith("_created"): + continue + if sample.name.startswith("server_"): + samples.append(sample) + return samples, multiprocess + + async def get_observability(self) -> dict[str, Any]: + # Honor the feature flag: when server observability is disabled, report + # unavailable so the dashboard renders its 'disabled' empty-state instead + # of a populated-with-zeros view (the middleware/binder are not installed). + if self._context is not None and not self._is_enabled(): + return { + "timestamp": time.time(), + "available": False, + "has_prometheus": self._has_prometheus(), + "multiprocess": False, + } + + samples, multiprocess = self._collect_server_samples() + has_prometheus = self._has_prometheus() + + # Aggregate totals (summed across worker_pid) + per-worker breakdown. + totals: dict[str, float] = {} + per_worker: dict[str, dict[str, Any]] = {} + for sample in samples: + totals[sample.name] = totals.get(sample.name, 0.0) + sample.value + pid = sample.labels.get("worker_pid") + if pid is not None: + worker = per_worker.setdefault(pid, {"pid": pid}) + worker[sample.name] = worker.get(sample.name, 0.0) + sample.value + + active = totals.get(_ACTIVE) + in_flight = totals.get(_IN_FLIGHT, 0.0) + requests_total = totals.get(_REQUESTS) + # server_workers is summed across pids; the live worker count is the + # number of distinct reporting workers (each reports its own count). + worker_count = len(per_worker) or int(totals.get(_WORKERS, 0.0)) + uptime = max((w.get(_UPTIME, 0.0) for w in per_worker.values()), default=totals.get(_UPTIME, 0.0)) + + return { + "timestamp": time.time(), + "available": True, + "has_prometheus": has_prometheus, + "multiprocess": multiprocess, + "server": self._server_identity(), + "workers": worker_count, + "uptime_seconds": uptime, + "active_connections": int(active) if active is not None else None, + "in_flight_requests": int(in_flight), + "requests_total": int(requests_total) if requests_total is not None else None, + # Snapshot default; the SSE stream computes the live per-consumer rate + # from successive samples (so it is not corrupted by shared state). + "requests_per_second": 0.0, + "started_total": int(totals.get(_STARTED, 0.0)), + "stopped_total": int(totals.get(_STOPPED, 0.0)), + "per_worker": [self._worker_row(w) for w in per_worker.values()], + "lifecycle": { + "started_total": int(totals.get(_STARTED, 0.0)), + "stopped_total": int(totals.get(_STOPPED, 0.0)), + }, + } + + @staticmethod + def _worker_row(worker: dict[str, Any]) -> dict[str, Any]: + native = worker.get(_NATIVE_CONNS) + return { + "pid": worker.get("pid"), + "uptime_seconds": worker.get(_UPTIME, 0.0), + "in_flight_requests": int(worker.get(_IN_FLIGHT, 0.0)), + "active_connections": int(worker.get(_ACTIVE, 0.0)), + "requests_total": int(worker.get(_REQUESTS, 0.0)), + # `is not None` (not truthiness) so a real 0 isn't shown as "n/a". + "native_connections": int(native) if native is not None else None, + } + + def _is_enabled(self) -> bool: + try: + return str(self._context.config.get("pyfly.server.observability.enabled", "true")).lower() in ( + "true", + "1", + "yes", + ) + except Exception: # noqa: BLE001 - default to enabled if config is unreadable + return True + + @staticmethod + def _has_prometheus() -> bool: + try: + import prometheus_client # noqa: F401 + + return True + except ImportError: + return False diff --git a/src/pyfly/admin/static/js/app.js b/src/pyfly/admin/static/js/app.js index 10de6b14..b5fcdd5c 100644 --- a/src/pyfly/admin/static/js/app.js +++ b/src/pyfly/admin/static/js/app.js @@ -32,6 +32,7 @@ const routes = { 'instances': () => import('./views/instances.js'), 'bean-graph': () => import('./views/bean-graph.js'), 'runtime': () => import('./views/runtime.js'), + 'observability': () => import('./views/observability.js'), 'wallboard': () => import('./views/wallboard.js'), }; diff --git a/src/pyfly/admin/static/js/components/sidebar.js b/src/pyfly/admin/static/js/components/sidebar.js index 977922cd..bf4cac21 100644 --- a/src/pyfly/admin/static/js/components/sidebar.js +++ b/src/pyfly/admin/static/js/components/sidebar.js @@ -37,6 +37,7 @@ export const NAV_ITEMS = [ { id: 'config', label: 'Configuration', icon: 'cog' }, { id: 'loggers', label: 'Loggers', icon: 'list' }, { id: 'metrics', label: 'Metrics', icon: 'chart', section: 'Monitoring' }, + { id: 'observability', label: 'Observability', icon: 'activity' }, { id: 'scheduled', label: 'Scheduled', icon: 'clock' }, { id: 'traces', label: 'Traces', icon: 'activity' }, { id: 'mappings', label: 'Mappings', icon: 'route', section: 'Infrastructure' }, diff --git a/src/pyfly/admin/static/js/views/observability.js b/src/pyfly/admin/static/js/views/observability.js new file mode 100644 index 00000000..33b12f43 --- /dev/null +++ b/src/pyfly/admin/static/js/views/observability.js @@ -0,0 +1,263 @@ +/** + * PyFly Admin — Observability View. + * + * Live server-layer observability: worker count, server uptime, active + * connections, in-flight requests, and requests/second — sourced from the + * framework's server_* Prometheus meters (aggregated across workers under + * multiprocess mode). Shows stat cards, rolling line charts, a per-worker + * breakdown table, lifecycle counters, and links into the Metrics / Traces views. + * + * Data source: GET /admin/api/observability + SSE /observability (event: observability) + */ + +import { createLineChart } from '../charts.js'; +import { createEmptyStateCard } from '../components/empty-state.js'; +import { pageSkeleton } from '../components/skeleton.js'; +import { createTable } from '../components/table.js'; +import { sse } from '../sse.js'; + +const MAX_DATA_POINTS = 60; + +function nowLabel(timestamp) { + return new Date((timestamp ? timestamp * 1000 : Date.now())).toLocaleTimeString(); +} + +function pushRolling(arr, value) { + arr.push(value); + if (arr.length > MAX_DATA_POINTS) arr.shift(); +} + +function fmtUptime(seconds) { + if (seconds == null) return '--'; + const s = Math.floor(seconds % 60); + const m = Math.floor((seconds / 60) % 60); + const h = Math.floor(seconds / 3600); + if (h > 0) return `${h}h ${m}m`; + if (m > 0) return `${m}m ${s}s`; + return `${s}s`; +} + +function statCard(label, value, iconClass = 'primary') { + const card = document.createElement('div'); + card.className = 'stat-card'; + const content = document.createElement('div'); + content.className = 'stat-card-content'; + const valEl = document.createElement('div'); + valEl.className = 'stat-card-value'; + valEl.textContent = value != null ? String(value) : '--'; + content.appendChild(valEl); + const labelEl = document.createElement('div'); + labelEl.className = 'stat-card-label'; + labelEl.textContent = label; + content.appendChild(labelEl); + card.appendChild(content); + const icon = document.createElement('div'); + icon.className = `stat-card-icon ${iconClass}`; + card.appendChild(icon); + return card; +} + +function chartCard(title) { + const card = document.createElement('div'); + card.className = 'admin-card'; + const header = document.createElement('div'); + header.className = 'admin-card-header'; + const h3 = document.createElement('h3'); + h3.textContent = title; + header.appendChild(h3); + card.appendChild(header); + const body = document.createElement('div'); + body.className = 'admin-card-body'; + body.style.height = '200px'; + const canvas = document.createElement('canvas'); + body.appendChild(canvas); + card.appendChild(body); + return { card, canvas }; +} + +/** + * Render the observability view. + * @param {HTMLElement} container + * @param {import('../api.js').AdminAPI} api + * @returns {Promise} Cleanup function + */ +export async function render(container, api) { + container.replaceChildren(); + + const wrapper = document.createElement('div'); + wrapper.className = 'view-enter'; + + const header = document.createElement('div'); + header.className = 'page-header'; + const headerLeft = document.createElement('div'); + const h1 = document.createElement('h1'); + h1.textContent = 'Observability'; + headerLeft.appendChild(h1); + const sub = document.createElement('div'); + sub.className = 'page-subtitle'; + sub.textContent = 'server.layer'; + headerLeft.appendChild(sub); + header.appendChild(headerLeft); + wrapper.appendChild(header); + + const loader = document.createElement('div'); + loader.appendChild(pageSkeleton({ stats: 4, rows: 4 })); + wrapper.appendChild(loader); + container.appendChild(wrapper); + + let data; + try { + data = await api.get('/observability'); + } catch (err) { + wrapper.removeChild(loader); + wrapper.appendChild(createEmptyStateCard({ + icon: 'alert', + tone: 'danger', + title: 'Failed to load observability data', + text: err.message, + })); + return; + } + wrapper.removeChild(loader); + + if (data.available === false || data.has_prometheus === false) { + wrapper.appendChild(createEmptyStateCard({ + icon: 'activity', + title: 'Server observability is disabled', + text: 'Enable pyfly.server.observability and install the observability extra (prometheus_client).', + })); + return; + } + + const server = data.server || {}; + + // ── Stat cards ─────────────────────────────────────────── + const statsRow = document.createElement('div'); + statsRow.className = 'grid-4 mb-lg'; + const workersEl = statCard('Workers', data.workers, 'primary'); + const uptimeEl = statCard('Server Uptime', fmtUptime(data.uptime_seconds), 'success'); + const connEl = statCard('Active Connections', data.active_connections ?? '--', 'info'); + const inflightEl = statCard('In-flight Requests', data.in_flight_requests ?? 0, 'warning'); + statsRow.append(workersEl, uptimeEl, connEl, inflightEl); + wrapper.appendChild(statsRow); + + const statsRow2 = document.createElement('div'); + statsRow2.className = 'grid-4 mb-lg'; + const rpsEl = statCard('Requests / sec', (data.requests_per_second ?? 0).toFixed ? data.requests_per_second.toFixed(2) : data.requests_per_second, 'primary'); + const reqTotalEl = statCard('Requests Total', data.requests_total ?? '--', 'info'); + const serverEl = statCard('Server', server.name ? `${server.name} ${server.version || ''}`.trim() : '--', 'success'); + const mpEl = statCard('Multi-worker', data.multiprocess ? 'aggregated' : 'single', 'warning'); + statsRow2.append(rpsEl, reqTotalEl, serverEl, mpEl); + wrapper.appendChild(statsRow2); + + const valueEls = { + workers: workersEl.querySelector('.stat-card-value'), + uptime: uptimeEl.querySelector('.stat-card-value'), + conn: connEl.querySelector('.stat-card-value'), + inflight: inflightEl.querySelector('.stat-card-value'), + rps: rpsEl.querySelector('.stat-card-value'), + reqTotal: reqTotalEl.querySelector('.stat-card-value'), + }; + + // ── Rolling charts ─────────────────────────────────────── + const labels = []; + const connData = []; + const inflightData = []; + const rpsData = []; + labels.push(nowLabel(data.timestamp)); + connData.push(data.active_connections || 0); + inflightData.push(data.in_flight_requests || 0); + rpsData.push(data.requests_per_second || 0); + + const chartsRow = document.createElement('div'); + chartsRow.className = 'grid-2 mb-lg'; + const { card: connCard, canvas: connCanvas } = chartCard('Active Connections'); + const { card: inflightCard, canvas: inflightCanvas } = chartCard('In-flight Requests'); + const { card: rpsCard, canvas: rpsCanvas } = chartCard('Requests / sec'); + chartsRow.append(connCard, inflightCard, rpsCard); + wrapper.appendChild(chartsRow); + + // ── Per-worker breakdown ───────────────────────────────── + const workerCard = document.createElement('div'); + workerCard.className = 'admin-card mb-lg'; + const wHeader = document.createElement('div'); + wHeader.className = 'admin-card-header'; + const wh3 = document.createElement('h3'); + wh3.textContent = 'Per-worker breakdown'; + wHeader.appendChild(wh3); + workerCard.appendChild(wHeader); + const wBody = document.createElement('div'); + wBody.className = 'admin-card-body'; + workerCard.appendChild(wBody); + wrapper.appendChild(workerCard); + + function renderWorkerTable(rows) { + wBody.replaceChildren(); + wBody.appendChild(createTable({ + columns: [ + { key: 'pid', label: 'Worker PID', sortable: true }, + { key: 'uptime_seconds', label: 'Uptime', render: (v) => fmtUptime(v) }, + { key: 'active_connections', label: 'Connections' }, + { key: 'in_flight_requests', label: 'In-flight' }, + { key: 'requests_total', label: 'Requests' }, + { key: 'native_connections', label: 'Native conns', render: (v) => (v == null ? 'n/a' : String(v)) }, + ], + data: rows || [], + emptyText: 'No workers reporting', + })); + } + renderWorkerTable(data.per_worker); + + // ── Links into Metrics / Traces ────────────────────────── + const links = document.createElement('div'); + links.className = 'mb-lg'; + links.style.display = 'flex'; + links.style.gap = '12px'; + for (const [hash, text] of [['#metrics', 'View all metrics →'], ['#traces', 'View traces →']]) { + const a = document.createElement('a'); + a.href = hash; + a.textContent = text; + a.className = 'admin-link'; + links.appendChild(a); + } + wrapper.appendChild(links); + + // ── Charts init ────────────────────────────────────────── + let connChart = null; + let inflightChart = null; + let rpsChart = null; + requestAnimationFrame(() => { + connChart = createLineChart(connCanvas, { label: 'Active Connections', color: '--admin-info', data: [...connData], labels: [...labels] }); + inflightChart = createLineChart(inflightCanvas, { label: 'In-flight Requests', color: '--admin-warning', data: [...inflightData], labels: [...labels] }); + rpsChart = createLineChart(rpsCanvas, { label: 'Requests / sec', color: '--admin-primary', data: [...rpsData], labels: [...labels] }); + }); + + // ── SSE live updates ───────────────────────────────────── + sse.connectTyped('/observability', 'observability', (ev) => { + const label = nowLabel(ev.timestamp); + pushRolling(labels, label); + pushRolling(connData, ev.active_connections || 0); + pushRolling(inflightData, ev.in_flight_requests || 0); + pushRolling(rpsData, ev.requests_per_second || 0); + + valueEls.workers.textContent = ev.workers != null ? String(ev.workers) : '--'; + valueEls.uptime.textContent = fmtUptime(ev.uptime_seconds); + valueEls.conn.textContent = ev.active_connections != null ? String(ev.active_connections) : '--'; + valueEls.inflight.textContent = String(ev.in_flight_requests ?? 0); + valueEls.rps.textContent = (ev.requests_per_second ?? 0).toFixed ? ev.requests_per_second.toFixed(2) : String(ev.requests_per_second ?? 0); + valueEls.reqTotal.textContent = ev.requests_total != null ? String(ev.requests_total) : '--'; + + if (connChart) connChart.update([...connData], [...labels]); + if (inflightChart) inflightChart.update([...inflightData], [...labels]); + if (rpsChart) rpsChart.update([...rpsData], [...labels]); + + renderWorkerTable(ev.per_worker); + }); + + return function cleanup() { + sse.disconnect('/observability'); + if (connChart) connChart.destroy(); + if (inflightChart) inflightChart.destroy(); + if (rpsChart) rpsChart.destroy(); + }; +} diff --git a/src/pyfly/admin/wiring.py b/src/pyfly/admin/wiring.py index 091e2fea..11f0dba7 100644 --- a/src/pyfly/admin/wiring.py +++ b/src/pyfly/admin/wiring.py @@ -56,6 +56,7 @@ def build_admin_routes( from pyfly.admin.providers.loggers_provider import LoggersProvider from pyfly.admin.providers.mappings_provider import MappingsProvider from pyfly.admin.providers.metrics_provider import MetricsProvider + from pyfly.admin.providers.observability_provider import ObservabilityProvider from pyfly.admin.providers.overview_provider import OverviewProvider from pyfly.admin.providers.runtime_provider import RuntimeProvider from pyfly.admin.providers.scheduled_provider import ScheduledProvider @@ -132,6 +133,7 @@ def _install_admin_indicators(_agg: HealthAggregator = health_agg) -> None: logfile=LogfileProvider(context), runtime=RuntimeProvider(), server=ServerProvider(context=context), + observability=ObservabilityProvider(context=context), instance_registry=admin_instance_registry, ) return list(admin_builder.build_routes()) diff --git a/src/pyfly/cli/run.py b/src/pyfly/cli/run.py index 0f9f4add..f0337fff 100644 --- a/src/pyfly/cli/run.py +++ b/src/pyfly/cli/run.py @@ -175,8 +175,26 @@ def run_command( os.environ["_PYFLY_BANNER_PRINTED"] = "1" os.environ["_PYFLY_WORKERS"] = str(config.workers) - # Pass server configuration to workers so they can log server info - os.environ["_PYFLY_SERVER_TYPE"] = config.type + # Multi-worker: enable prometheus multiprocess aggregation BEFORE the server + # forks workers, so each inherits PROMETHEUS_MULTIPROC_DIR and the server-level + # (and http/process) meters aggregate across workers at /actuator/prometheus. + if config.workers > 1: + from pyfly.observability.multiprocess import init_multiprocess_dir + + init_multiprocess_dir(config.workers) + + # Pass server configuration to workers so they can log server info. + # Resolve the concrete server type when 'auto' so the value is uvicorn / + # granian / hypercorn — not the literal "auto" sentinel — which is what the + # startup log and the server_* metric `server` label key off of. + resolved_type = config.type + if resolved_type == "auto": + try: + resolved_type = server_adapter.server_info.name + except Exception: # noqa: BLE001 - fall back to the sentinel if unavailable + resolved_type = config.type + config.type = resolved_type + os.environ["_PYFLY_SERVER_TYPE"] = resolved_type os.environ["_PYFLY_SERVER_HOST"] = host os.environ["_PYFLY_SERVER_PORT"] = str(port) os.environ["_PYFLY_EVENT_LOOP"] = config.event_loop diff --git a/src/pyfly/config/properties/server.py b/src/pyfly/config/properties/server.py index 1c5b424a..d2a20c02 100644 --- a/src/pyfly/config/properties/server.py +++ b/src/pyfly/config/properties/server.py @@ -34,6 +34,22 @@ class GranianProperties: respawn_failed_workers: bool = True +@dataclass +class ServerObservabilityProperties: + """Server-layer observability tuning (pyfly.server.observability.*). + + ``enabled`` gates the pure-ASGI server-metrics middleware + the + ServerMetricsBinder (mirrors ``pyfly.observability.metrics.enabled``). + ``sample_interval_seconds`` is how often the binder refreshes the + worker/uptime gauges. ``access_log`` opts into the server's native + access logging (off by default to preserve the WARNING-only posture). + """ + + enabled: bool = True + sample_interval_seconds: float = 5.0 + access_log: bool = False + + @config_properties(prefix="pyfly.server") @dataclass class ServerProperties: @@ -58,6 +74,7 @@ class ServerProperties: max_concurrent_connections: int | None = None max_requests_per_worker: int | None = None granian: GranianProperties = field(default_factory=GranianProperties) + observability: ServerObservabilityProperties = field(default_factory=ServerObservabilityProperties) def resolve_app_port(config: Config) -> int: diff --git a/src/pyfly/observability/multiprocess.py b/src/pyfly/observability/multiprocess.py new file mode 100644 index 00000000..3a4477d2 --- /dev/null +++ b/src/pyfly/observability/multiprocess.py @@ -0,0 +1,141 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""prometheus_client multiprocess-mode support for multi-worker deployments. + +When a pyfly app runs with ``workers > 1`` (``uvicorn.run(workers=N)``, +Granian/Hypercorn pre-fork), each worker is a separate process with its own +default Prometheus ``REGISTRY``. A single ``/actuator/prometheus`` scrape would +then reflect only the one worker that answered. prometheus_client's multiprocess +mode fixes this: each worker writes its metric values to mmap files in a shared +directory, and the scrape aggregates across all of them via +``MultiProcessCollector``. + +The directory MUST be set in ``PROMETHEUS_MULTIPROC_DIR`` **before the first +metric is created** in every worker. :func:`init_multiprocess_dir` is therefore +called from ``pyfly run`` *before* the server forks workers, so the env var is +inherited by every worker process. + +Limitation: custom Python collectors (e.g. the process/system metrics collector) +are NOT aggregated by ``MultiProcessCollector`` — only ``Counter`` / ``Gauge`` / +``Histogram`` / ``Summary`` values written to the mmap files are. The ``server_*`` +and ``http_server_requests_*`` meters are real Counters/Gauges, so they aggregate +correctly; process gauges fall back to single-process semantics. +""" + +from __future__ import annotations + +import atexit +import contextlib +import glob +import os +import shutil +import tempfile +from typing import Any + +_ENV = "PROMETHEUS_MULTIPROC_DIR" +_DIR_PREFIX = "pyfly-prometheus-mp-" + + +def is_multiprocess() -> bool: + """True when prometheus_client multiprocess mode is active.""" + return bool(os.environ.get(_ENV)) + + +def init_multiprocess_dir(workers: int) -> str | None: + """Create + register a multiprocess metrics dir for a multi-worker run. + + Must be called BEFORE the server forks workers and before any metric is + created. No-ops (returns ``None``) for a single worker, or returns the + pre-existing dir if ``PROMETHEUS_MULTIPROC_DIR`` is already set by the + operator. The fresh dir is cleared so stale series from a previous run of the + same launcher pid do not leak into the new run. + """ + if workers <= 1: + return None + existing = os.environ.get(_ENV) + if existing: + return existing + _sweep_stale_dirs() + path = os.path.join(tempfile.gettempdir(), f"{_DIR_PREFIX}{os.getpid()}") + os.makedirs(path, exist_ok=True) + _clear_dir(path) + os.environ[_ENV] = path + # Clean up our own dir on launcher exit so mmap files don't accumulate in tmp + # across restarts. Only the launcher (which created the dir) registers this; + # forked workers inherit the env var but not this atexit hook. + _launcher_pid = os.getpid() + + def _cleanup() -> None: # pragma: no cover - runs at interpreter exit + if os.getpid() == _launcher_pid: + shutil.rmtree(path, ignore_errors=True) + + atexit.register(_cleanup) + return path + + +def _clear_dir(path: str) -> None: + for db_file in glob.glob(os.path.join(path, "*.db")): + with contextlib.suppress(OSError): + os.remove(db_file) + + +def _sweep_stale_dirs() -> None: + """Remove leftover pyfly multiprocess dirs from crashed prior launches.""" + pattern = os.path.join(tempfile.gettempdir(), f"{_DIR_PREFIX}*") + for stale in glob.glob(pattern): + with contextlib.suppress(OSError): + shutil.rmtree(stale, ignore_errors=True) + + +def build_multiprocess_registry() -> Any: + """A fresh registry that aggregates every worker's mmap-backed metrics. + + Use this (instead of the default ``REGISTRY``) to scrape in multiprocess mode. + """ + from prometheus_client import CollectorRegistry + from prometheus_client.multiprocess import MultiProcessCollector + + registry = CollectorRegistry() + MultiProcessCollector(registry) # type: ignore[no-untyped-call] + return registry + + +def collect_registry() -> Any: + """Return the registry to scrape: aggregating in multiprocess mode, else default. + + Tolerant: if ``PROMETHEUS_MULTIPROC_DIR`` is set but the directory is missing + or unreadable, it is (re)created when possible and otherwise falls back to the + process default ``REGISTRY`` rather than letting the scrape raise a 500. + """ + from prometheus_client import REGISTRY + + if not is_multiprocess(): + return REGISTRY + try: + path = os.environ.get(_ENV) + if path and not os.path.isdir(path): + os.makedirs(path, exist_ok=True) + return build_multiprocess_registry() + except Exception: # noqa: BLE001 - never let multiprocess setup break the scrape + return REGISTRY + + +def mark_worker_dead(pid: int) -> None: + """Drop a dead worker's live-gauge contributions (call on worker exit).""" + if not is_multiprocess(): + return + with contextlib.suppress(Exception): + from prometheus_client.multiprocess import mark_process_dead + + mark_process_dead(pid) # type: ignore[no-untyped-call] diff --git a/src/pyfly/observability/server_metrics.py b/src/pyfly/observability/server_metrics.py new file mode 100644 index 00000000..6839d104 --- /dev/null +++ b/src/pyfly/observability/server_metrics.py @@ -0,0 +1,233 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Server lifecycle + identity meters, bound from the in-worker ASGI lifespan. + +The :class:`ServerMetricsBinder` runs inside the wrapped ASGI lifespan (the one +adapter-agnostic place that executes in EVERY worker on the real event loop — +beside ``register_process_metrics`` and ``ManagementServer.start``). It emits the +server-layer meters that do not need a live server handle: + + ``server_workers`` (gauge) — configured worker count (from + ``_PYFLY_WORKERS``); per ``worker_pid`` + ``server_uptime_seconds`` (gauge) — seconds since this worker bound, refreshed + on a sampling tick + ``server_started_total`` (counter) — incremented once per worker on startup + ``server_stopped_total`` (counter) — incremented once per worker on graceful stop + ``server_native_connections`` (gauge) — OPTIONAL: uvicorn's true socket count, set + only when a :class:`ServerStatsPort` sample + provides it (``None`` elsewhere) + +The live ASGI-scope connection / in-flight / request gauges are owned by the +pure-ASGI :mod:`~pyfly.web.adapters.starlette.asgi_server_metrics` middleware +(single writer per metric). +""" + +from __future__ import annotations + +import asyncio +import contextlib +import logging +import os +import time +from typing import Any + +try: + from prometheus_client import Counter, Gauge + + _HAS_PROMETHEUS = True +except ImportError: # pragma: no cover - exercised only without the observability extra + Counter = None # type: ignore[assignment,misc] + Gauge = None # type: ignore[assignment,misc] + _HAS_PROMETHEUS = False + +from pyfly.server.ports.server_stats import ServerStatsPort + +_LABELS = ["server", "worker_pid"] +_logger = logging.getLogger("pyfly.observability.server") + +# Process-global collectors (one set per process, like the request timer). +_collectors: dict[str, Any] | None = None + + +def _get_binder_collectors() -> dict[str, Any]: + """Get-or-create the process-global server lifecycle/identity meters.""" + global _collectors + if _collectors is None: + _collectors = { + "workers": Gauge( + "server_workers", "Configured server worker processes", _LABELS, multiprocess_mode="liveall" + ), + "uptime": Gauge( + "server_uptime_seconds", "Seconds since this server worker bound", _LABELS, multiprocess_mode="liveall" + ), + "native_conns": Gauge( + "server_native_connections", + "Server-native socket connections (uvicorn; incl. idle keep-alive)", + _LABELS, + multiprocess_mode="livesum", + ), + "started": Counter("server_started", "Server worker startups", _LABELS), + "stopped": Counter("server_stopped", "Server worker graceful stops", _LABELS), + } + return _collectors + + +def reset_collectors() -> None: + """Unregister and drop the global collectors. Test-support only.""" + global _collectors + if _collectors is None: + return + from prometheus_client import REGISTRY + + for collector in _collectors.values(): + with contextlib.suppress(KeyError, ValueError): + REGISTRY.unregister(collector) + _collectors = None + + +def resolve_worker_count(fallback: int = 1) -> int: + """Worker count from the ``_PYFLY_WORKERS`` env (set by ``cli/run.py``).""" + raw = os.environ.get("_PYFLY_WORKERS") + if raw: + with contextlib.suppress(ValueError): + return max(1, int(raw)) + return max(1, fallback) + + +def build_binder_for_context(context: Any, *, sample_interval: float = 5.0) -> ServerMetricsBinder | None: + """Build a :class:`ServerMetricsBinder` for a worker from its context. + + Server identity comes from the ``_PYFLY_SERVER_TYPE`` env var (set by + ``cli/run.py`` and inherited by forked workers), falling back to the + ``ApplicationServerPort`` bean's ``ServerInfo``. The same adapter bean + supplies the optional :class:`ServerStatsPort` enrichment when it implements + it. Shared by the Starlette and FastAPI ``create_app`` lifespans (DRY). + """ + if context is None: + return None + server_name = os.environ.get("_PYFLY_SERVER_TYPE", "") or "" + # "auto" is the unresolved server-type sentinel — fall back to the concrete + # adapter name so the `server` metric label is uvicorn/granian/hypercorn. + if server_name == "auto": + server_name = "" + stats_port: ServerStatsPort | None = None + try: + from pyfly.server.ports.outbound import ApplicationServerPort + + adapter = context.get_bean(ApplicationServerPort) + if not server_name: + server_name = adapter.server_info.name + if isinstance(adapter, ServerStatsPort): + stats_port = adapter + except Exception as exc: # noqa: BLE001 - binder is best-effort; never block startup + _logger.debug("server_stats_adapter_unavailable", exc_info=exc) + stats_port = None + return ServerMetricsBinder( + server_name=server_name or "unknown", + workers=resolve_worker_count(), + stats_port=stats_port, + sample_interval=sample_interval, + ) + + +class ServerMetricsBinder: + """Binds server lifecycle/identity meters for the lifetime of a worker.""" + + def __init__( + self, + *, + server_name: str, + workers: int, + stats_port: ServerStatsPort | None = None, + sample_interval: float = 5.0, + ) -> None: + self._server_name = server_name + self._workers = max(1, workers) + self._stats_port = stats_port + self._sample_interval = max(0.001, sample_interval) + self._labels = (server_name, str(os.getpid())) + self._start_monotonic: float | None = None + self._task: asyncio.Task[None] | None = None + + async def start(self) -> None: + """Register meters, mark startup, and launch the refresh task.""" + if not _HAS_PROMETHEUS: + return + self._start_monotonic = time.monotonic() + with contextlib.suppress(Exception): + _get_binder_collectors()["started"].labels(*self._labels).inc() + await self._refresh() + self._task = asyncio.create_task(self._run()) + + async def stop(self) -> None: + """Cancel the refresh task, set final uptime, mark graceful stop. + + Hardened so a sampling task that died mid-life (its stored exception is + re-raised by ``await``) can NEVER prevent the final refresh, the + ``server_stopped_total`` increment, or the multiprocess dead-worker + cleanup from running — i.e. shutdown is always graceful. + """ + if not _HAS_PROMETHEUS: + return + if self._task is not None: + self._task.cancel() + # suppress BOTH the normal CancelledError and any exception a + # dead task stored, so cleanup below always proceeds. + with contextlib.suppress(asyncio.CancelledError, Exception): + await self._task + self._task = None + await self._refresh() + with contextlib.suppress(Exception): + _get_binder_collectors()["stopped"].labels(*self._labels).inc() + # Multiprocess: drop this worker's live-gauge contributions so a recycled + # worker's stale series do not linger in the aggregated scrape. + with contextlib.suppress(Exception): + from pyfly.observability.multiprocess import mark_worker_dead + + mark_worker_dead(os.getpid()) + + def _uptime(self) -> float: + if self._start_monotonic is None: + return 0.0 + return max(0.0, time.monotonic() - self._start_monotonic) + + async def _refresh(self) -> None: + """Update the gauges; never raises. + + Every gauge write AND the (synchronous) ``sample()`` call are guarded: + a transient failure (e.g. an mmap write error under multiprocess disk + pressure, or a slow/raising third-party ``ServerStatsPort``) skips one + tick instead of killing the sampler. ``sample()`` runs in a worker thread + so a slow implementation cannot stall the event loop or delay drain. + """ + try: + cols = _get_binder_collectors() + cols["workers"].labels(*self._labels).set(self._workers) + cols["uptime"].labels(*self._labels).set(self._uptime()) + if self._stats_port is not None: + sample = await asyncio.to_thread(self._stats_port.sample) + if sample is not None and sample.active_connections is not None: + cols["native_conns"].labels(*self._labels).set(sample.active_connections) + except Exception as exc: # noqa: BLE001 - the binder must never crash the worker + _logger.debug("server_metrics_refresh_failed", exc_info=exc) + + async def _run(self) -> None: + try: + while True: + await asyncio.sleep(self._sample_interval) + await self._refresh() + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 - a sampler must never die silently + _logger.warning("server_metrics_sampler_stopped", exc_info=exc) diff --git a/src/pyfly/resources/pyfly-defaults.yaml b/src/pyfly/resources/pyfly-defaults.yaml index 85add6f8..ad1f772e 100644 --- a/src/pyfly/resources/pyfly-defaults.yaml +++ b/src/pyfly/resources/pyfly-defaults.yaml @@ -77,6 +77,14 @@ pyfly: runtime-threads: 1 runtime-mode: "auto" respawn-failed-workers: true + # Server-layer observability (worker/connection/uptime metrics + lifecycle). + # Emits server_* meters into the Prometheus registry, exposed at + # /actuator/prometheus and the admin Observability view. Enabled by the + # web/core starters; needs the observability extra (prometheus_client). + observability: + enabled: true + sample-interval-seconds: 5.0 + access-log: false data: enabled: false url: "sqlite+aiosqlite:///pyfly.db" diff --git a/src/pyfly/server/adapters/granian/adapter.py b/src/pyfly/server/adapters/granian/adapter.py index 4c19713c..2f3e1f9a 100644 --- a/src/pyfly/server/adapters/granian/adapter.py +++ b/src/pyfly/server/adapters/granian/adapter.py @@ -15,8 +15,11 @@ from __future__ import annotations +import os +import time from typing import Any +from pyfly.server.ports.server_stats import ServerStats from pyfly.server.types import ServerInfo @@ -25,11 +28,17 @@ class GranianServerAdapter: Granian uses Rust's Hyper + Tokio for network I/O, achieving ~3x the throughput of Uvicorn with significantly lower tail latency. + + Implements the optional :class:`~pyfly.server.ports.server_stats.ServerStatsPort` + for workers + uptime only — Granian's Rust runtime exposes no Python-side + connection/request handle, so those fields are always ``None`` (the pure-ASGI + server-metrics middleware supplies them uniformly instead). """ def __init__(self) -> None: self._server: Any = None self._info: ServerInfo | None = None + self._serve_start_monotonic: float | None = None def serve(self, app: str | Any, config: Any) -> None: """Start Granian (blocking).""" @@ -87,6 +96,7 @@ def serve(self, app: str | Any, config: Any) -> None: host=host, port=port, ) + self.on_serve_start() server.serve() async def serve_async(self, app: str | Any, config: Any) -> None: @@ -111,6 +121,25 @@ def shutdown(self) -> None: """ self._server = None + # -- ServerStatsPort (best-effort) -------------------------------------- + + def on_serve_start(self) -> None: + """Record the server-bind moment (basis for ``server_uptime_seconds``).""" + self._serve_start_monotonic = time.monotonic() + + def on_serve_stop(self) -> None: + """No-op — Granian has no Python-side server handle to release.""" + + def sample(self) -> ServerStats: + """Workers + uptime only; connection/request fields are always ``None``.""" + workers = self._info.workers if self._info is not None else 1 + return ServerStats(workers=workers, server_uptime_seconds=self._uptime_seconds(), worker_pid=os.getpid()) + + def _uptime_seconds(self) -> float: + if self._serve_start_monotonic is None: + return 0.0 + return max(0.0, time.monotonic() - self._serve_start_monotonic) + @property def server_info(self) -> ServerInfo: if self._info is not None: diff --git a/src/pyfly/server/adapters/hypercorn/adapter.py b/src/pyfly/server/adapters/hypercorn/adapter.py index 6f900f50..9db6820d 100644 --- a/src/pyfly/server/adapters/hypercorn/adapter.py +++ b/src/pyfly/server/adapters/hypercorn/adapter.py @@ -16,8 +16,11 @@ from __future__ import annotations import asyncio +import os +import time from typing import Any +from pyfly.server.ports.server_stats import ServerStats from pyfly.server.types import ServerInfo @@ -26,11 +29,17 @@ class HypercornServerAdapter: The only mainstream Python ASGI server with HTTP/3 (QUIC) support. Also supports Trio as an alternative to asyncio. + + Implements the optional :class:`~pyfly.server.ports.server_stats.ServerStatsPort` + for workers + uptime only — ``hypercorn.asyncio.serve`` returns no server + object, so connection/request fields are always ``None`` (the pure-ASGI + server-metrics middleware supplies them uniformly instead). """ def __init__(self) -> None: self._shutdown_event: asyncio.Event | None = None self._info: ServerInfo | None = None + self._serve_start_monotonic: float | None = None def serve(self, app: str | Any, config: Any) -> None: """Start Hypercorn (blocking).""" @@ -74,6 +83,7 @@ async def serve_async(self, app: str | Any, config: Any) -> None: port=port, ) + self.on_serve_start() await serve(app, hc_config, shutdown_trigger=self._shutdown_event.wait) # type: ignore[arg-type,unused-ignore] def shutdown(self) -> None: @@ -81,6 +91,25 @@ def shutdown(self) -> None: if self._shutdown_event is not None: self._shutdown_event.set() + # -- ServerStatsPort (best-effort) -------------------------------------- + + def on_serve_start(self) -> None: + """Record the server-bind moment (basis for ``server_uptime_seconds``).""" + self._serve_start_monotonic = time.monotonic() + + def on_serve_stop(self) -> None: + """No-op — Hypercorn exposes no server handle to release.""" + + def sample(self) -> ServerStats: + """Workers + uptime only; connection/request fields are always ``None``.""" + workers = self._info.workers if self._info is not None else 1 + return ServerStats(workers=workers, server_uptime_seconds=self._uptime_seconds(), worker_pid=os.getpid()) + + def _uptime_seconds(self) -> float: + if self._serve_start_monotonic is None: + return 0.0 + return max(0.0, time.monotonic() - self._serve_start_monotonic) + @property def server_info(self) -> ServerInfo: if self._info is not None: diff --git a/src/pyfly/server/adapters/uvicorn/adapter.py b/src/pyfly/server/adapters/uvicorn/adapter.py index ea0a9eea..1a645033 100644 --- a/src/pyfly/server/adapters/uvicorn/adapter.py +++ b/src/pyfly/server/adapters/uvicorn/adapter.py @@ -15,23 +15,39 @@ from __future__ import annotations +import os +import time from typing import Any import uvicorn +from pyfly.server.ports.server_stats import ServerStats from pyfly.server.types import ServerInfo +# Process-global reference to the live ``uvicorn.Server`` when this process runs +# the server in-process (the ``serve_async`` embedding path). It lets the +# ServerMetricsBinder read native ``server_state`` even when it holds a different +# adapter *instance* than the one serving. It is NOT set on the forked +# ``uvicorn.run(workers=N)`` production path — each worker builds its own Server +# inside uvicorn, so the binder falls back to the pure-ASGI middleware there. +_active_server: Any = None + class UvicornServerAdapter: """ApplicationServerPort implementation backed by Uvicorn. The most widely used Python ASGI server. Uses httptools + uvloop for optimal performance when ``uvicorn[standard]`` is installed. + + Also implements the optional :class:`~pyfly.server.ports.server_stats.ServerStatsPort` + (best-effort): on the ``serve_async`` path it surfaces uvicorn's true socket + connection count and total requests from ``Server.server_state``. """ def __init__(self) -> None: self._server: Any = None self._info: ServerInfo | None = None + self._serve_start_monotonic: float | None = None @staticmethod def _build_kwargs(host: str, port: int, loop: str, config: Any) -> dict[str, Any]: @@ -102,13 +118,62 @@ async def serve_async(self, app: str | Any, config: Any) -> None: host=host, port=port, ) - await server.serve() + global _active_server + _active_server = server + self.on_serve_start() + try: + await server.serve() + finally: + self.on_serve_stop() def shutdown(self) -> None: """Request graceful shutdown.""" if self._server is not None: self._server.should_exit = True + # -- ServerStatsPort (best-effort) -------------------------------------- + + def on_serve_start(self) -> None: + """Record the server-bind moment (basis for ``server_uptime_seconds``).""" + self._serve_start_monotonic = time.monotonic() + + def on_serve_stop(self) -> None: + """Clear the process-global live-server reference.""" + global _active_server + if _active_server is self._server: + _active_server = None + + def sample(self) -> ServerStats | None: + """Sample live uvicorn stats when a server runs in this process. + + Reads ``Server.server_state`` (total requests + the live connection set) + from this adapter's own server or, failing that, the process-global + ``_active_server``. Returns ``None`` connection/request fields when no + in-process server handle is available (the forked production path). + """ + srv = self._server or _active_server + active_connections: int | None = None + total_requests: int | None = None + state = getattr(srv, "server_state", None) + if state is not None: + conns = getattr(state, "connections", None) + if conns is not None: + active_connections = len(conns) + total_requests = getattr(state, "total_requests", None) + workers = self._info.workers if self._info is not None else 1 + return ServerStats( + workers=workers, + server_uptime_seconds=self._uptime_seconds(), + worker_pid=os.getpid(), + active_connections=active_connections, + total_requests=total_requests, + ) + + def _uptime_seconds(self) -> float: + if self._serve_start_monotonic is None: + return 0.0 + return max(0.0, time.monotonic() - self._serve_start_monotonic) + @property def server_info(self) -> ServerInfo: if self._info is not None: diff --git a/src/pyfly/server/ports/server_stats.py b/src/pyfly/server/ports/server_stats.py new file mode 100644 index 00000000..ee1b65ca --- /dev/null +++ b/src/pyfly/server/ports/server_stats.py @@ -0,0 +1,72 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Outbound port: best-effort server runtime statistics. + +:class:`ServerStatsPort` is an *optional* capability a server adapter may +implement to expose live, server-native runtime numbers (true socket +connections, total requests served) that the ASGI layer cannot see. It mirrors +the :class:`~pyfly.observability.ports.MetricsRecorder` / ``NoOp`` philosophy: +**a ``None`` field means "this server cannot report it"**, so the binder that +consumes a sample never has to guard per-field. + +Only servers whose adapter owns the running server object in-process (Uvicorn via +``serve_async``) can populate ``active_connections`` / ``total_requests``. On the +forked ``pyfly run`` production path (``uvicorn.run(workers=N)``), the worker's +adapter bean is not the object running the server, so ``sample()`` returns those +fields as ``None`` and the pure-ASGI server-metrics middleware supplies the +uniform connection/in-flight numbers instead. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Protocol, runtime_checkable + + +@dataclass(frozen=True) +class ServerStats: + """A point-in-time sample of a server's runtime state. + + ``active_connections`` and ``total_requests`` are ``None`` when the server + exposes no Python-side handle to read them (Granian's Rust runtime, + Hypercorn's ``serve`` returning nothing, or any forked worker). + """ + + workers: int + server_uptime_seconds: float + worker_pid: int + active_connections: int | None = None + total_requests: int | None = None + + +@runtime_checkable +class ServerStatsPort(Protocol): + """Optional server-statistics capability implemented by server adapters. + + Analogous to Micrometer's ``TomcatMetrics`` binder source — but best-effort: + a server that cannot introspect itself still satisfies the protocol by + returning ``None`` fields (or ``None`` from :meth:`sample`). + """ + + def sample(self) -> ServerStats | None: + """Return a current :class:`ServerStats`, or ``None`` if unavailable.""" + ... + + def on_serve_start(self) -> None: + """Record the server-bind moment (basis for ``server_uptime_seconds``).""" + ... + + def on_serve_stop(self) -> None: + """Release any retained server handle/reference.""" + ... diff --git a/src/pyfly/starters/core.py b/src/pyfly/starters/core.py index 22c68fd1..d4491b96 100644 --- a/src/pyfly/starters/core.py +++ b/src/pyfly/starters/core.py @@ -64,6 +64,8 @@ class Application: # Web tier — Starlette/FastAPI auto-config + ASGI server selection "pyfly.web.enabled": "true", "pyfly.server.enabled": "true", + # Server-layer observability — worker/connection/uptime metrics + lifecycle + "pyfly.server.observability.enabled": "true", # Observability — Prometheus + OpenTelemetry "pyfly.observability.enabled": "true", "pyfly.metrics.enabled": "true", diff --git a/src/pyfly/starters/web.py b/src/pyfly/starters/web.py index b3ab7a16..1c871a39 100644 --- a/src/pyfly/starters/web.py +++ b/src/pyfly/starters/web.py @@ -67,6 +67,8 @@ class Application: "pyfly.web.enabled": "true", # ASGI server — Granian / Uvicorn / Hypercorn auto-detected. "pyfly.server.enabled": "true", + # Server-layer observability — worker/connection/uptime metrics + lifecycle. + "pyfly.server.observability.enabled": "true", # Observability — metrics + tracing. "pyfly.observability.enabled": "true", "pyfly.metrics.enabled": "true", diff --git a/src/pyfly/web/adapters/fastapi/app.py b/src/pyfly/web/adapters/fastapi/app.py index 1e3e166e..4bbd8608 100644 --- a/src/pyfly/web/adapters/fastapi/app.py +++ b/src/pyfly/web/adapters/fastapi/app.py @@ -119,6 +119,23 @@ def create_app( except (ImportError, AssertionError): metrics_filter_instance = None + # Server-layer observability (pyfly.server.observability.*) — see the matching + # block in pyfly.web.adapters.starlette.app for the full rationale. The pure-ASGI + # server-metrics middleware (added OUTERMOST below) is the primary source; the + # ServerMetricsBinder (started in the lifespan) adds worker/uptime/lifecycle meters. + server_obs_enabled = False + server_obs_interval = 5.0 + if context is not None: + server_obs_enabled = str(context.config.get("pyfly.server.observability.enabled", "true")).lower() in ( + "true", + "1", + "yes", + ) + try: + server_obs_interval = float(context.config.get("pyfly.server.observability.sample-interval-seconds", 5.0)) + except (TypeError, ValueError): + server_obs_interval = 5.0 + # Resolve actuator state early so the httpexchanges recorder filter can join. from pyfly.actuator.wiring import make_http_exchange_filter, resolve_actuator_active @@ -220,6 +237,12 @@ def _install_user_filters() -> None: # rejects the credential-less preflight with 401 and the browser blocks the # real request. Starlette applies middleware outermost first. middleware: list[Middleware] = [] + # Server-metrics counter is the OUTERMOST middleware so it observes every raw + # ASGI connection (incl. websockets + CORS preflights) before any other layer. + if server_obs_enabled: + from pyfly.web.adapters.starlette.asgi_server_metrics import ServerMetricsASGIMiddleware + + middleware.append(Middleware(ServerMetricsASGIMiddleware, enabled=True)) if cors is not None: from starlette.middleware.cors import CORSMiddleware @@ -416,6 +439,13 @@ async def _lifespan_with_dynamic_wiring(app_: FastAPI) -> AsyncIterator[None]: async with _inner_lifespan_ctx(app_): _install_dynamic_wiring() mgmt_server = None + server_binder = None + if server_obs_enabled: + from pyfly.observability.server_metrics import build_binder_for_context + + server_binder = build_binder_for_context(context, sample_interval=server_obs_interval) + if server_binder is not None: + await server_binder.start() try: if _serve_separately and management_props is not None and context is not None: from pyfly.config.properties.server import resolve_app_host @@ -440,6 +470,8 @@ async def _lifespan_with_dynamic_wiring(app_: FastAPI) -> AsyncIterator[None]: await mgmt_server.start() yield finally: + if server_binder is not None: + await server_binder.stop() if mgmt_server is not None: await mgmt_server.stop() diff --git a/src/pyfly/web/adapters/starlette/app.py b/src/pyfly/web/adapters/starlette/app.py index 7c712ab7..cae5aca7 100644 --- a/src/pyfly/web/adapters/starlette/app.py +++ b/src/pyfly/web/adapters/starlette/app.py @@ -131,6 +131,26 @@ def create_app( except (ImportError, AssertionError): metrics_filter_instance = None # prometheus_client not installed + # Server-layer observability (pyfly.server.observability.*). The pure-ASGI + # server-metrics middleware (added as the OUTERMOST middleware below) is the + # PRIMARY source for live connection / in-flight / request gauges — it runs + # in every worker, for every server and worker count, unlike a server's native + # stats which on the forked uvicorn.run path live in objects the in-worker + # lifespan cannot reach. The ServerMetricsBinder (started in the lifespan) + # adds worker / uptime / lifecycle meters. Both no-op without prometheus_client. + server_obs_enabled = False + server_obs_interval = 5.0 + if context is not None: + server_obs_enabled = str(context.config.get("pyfly.server.observability.enabled", "true")).lower() in ( + "true", + "1", + "yes", + ) + try: + server_obs_interval = float(context.config.get("pyfly.server.observability.sample-interval-seconds", 5.0)) + except (TypeError, ValueError): + server_obs_interval = 5.0 + # Resolve actuator state early so the httpexchanges recorder filter (if any) # can join the chain alongside the metrics filter and trace collector. from pyfly.actuator.wiring import make_http_exchange_filter, resolve_actuator_active @@ -245,6 +265,12 @@ def _install_user_filters() -> int: # the real request ("Load failed"). Starlette applies middleware outermost # first, so CORS is prepended ahead of WebFilterChainMiddleware. middleware: list[Middleware] = [] + # Server-metrics counter is the OUTERMOST middleware so it observes every raw + # ASGI connection (incl. websockets + CORS preflights) before any other layer. + if server_obs_enabled: + from pyfly.web.adapters.starlette.asgi_server_metrics import ServerMetricsASGIMiddleware + + middleware.append(Middleware(ServerMetricsASGIMiddleware, enabled=True)) if cors is not None: from starlette.middleware.cors import CORSMiddleware @@ -424,6 +450,13 @@ async def _lifespan_with_dynamic_wiring(app_: Starlette) -> AsyncIterator[None]: async with _user_lifespan(app_): # type: ignore[operator] _install_dynamic_wiring(app_) # beans are now instantiated mgmt_server = None + server_binder = None + if server_obs_enabled: + from pyfly.observability.server_metrics import build_binder_for_context + + server_binder = build_binder_for_context(context, sample_interval=server_obs_interval) + if server_binder is not None: + await server_binder.start() try: if _serve_separately and management_props is not None and context is not None: from pyfly.config.properties.server import resolve_app_host @@ -448,6 +481,8 @@ async def _lifespan_with_dynamic_wiring(app_: Starlette) -> AsyncIterator[None]: await mgmt_server.start() yield finally: + if server_binder is not None: + await server_binder.stop() if mgmt_server is not None: await mgmt_server.stop() diff --git a/src/pyfly/web/adapters/starlette/asgi_server_metrics.py b/src/pyfly/web/adapters/starlette/asgi_server_metrics.py new file mode 100644 index 00000000..b5a20173 --- /dev/null +++ b/src/pyfly/web/adapters/starlette/asgi_server_metrics.py @@ -0,0 +1,139 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pure-ASGI server-metrics middleware — the uniform server-observability floor. + +This wraps the application at the OUTERMOST layer (below Starlette's middleware +stack), so it sees the raw ASGI ``scope`` for every connection — including +websockets and requests the HTTP ``WebFilter`` chain never sees (it short-circuits +on ``scope["type"] != "http"``). It is the PRIMARY source for live server numbers +because it runs in-process in **every** worker, identically for uvicorn / granian +/ hypercorn and for any worker count (unlike a server's native stats, which on the +forked ``uvicorn.run(workers=N)`` path live in objects the in-worker lifespan +cannot reach). + +Emits (single writer per metric; labels ``server`` + ``worker_pid``): + + ``server_active_connections`` (gauge) — open http+websocket scopes + ``server_in_flight_requests`` (gauge) — http scopes currently handled + ``server_requests_total`` (counter) — completed http scopes + +``active_connections`` here is an ASGI-scope count, NOT a true socket count +(persistent keep-alive sockets the server holds idle are invisible to ASGI); the +uvicorn ``ServerStatsPort`` surfaces the true socket count separately. +""" + +from __future__ import annotations + +import os +from typing import Any + +try: + from prometheus_client import Counter, Gauge + + _HAS_PROMETHEUS = True +except ImportError: # pragma: no cover - exercised only without the observability extra + Counter = None # type: ignore[assignment,misc] + Gauge = None # type: ignore[assignment,misc] + _HAS_PROMETHEUS = False + +_ACTIVE_METRIC = "server_active_connections" +_IN_FLIGHT_METRIC = "server_in_flight_requests" +# prometheus_client appends ``_total`` to a Counter name itself. +_REQUESTS_METRIC = "server_requests" +_LABELS = ["server", "worker_pid"] + +# Long-lived SSE streams must not be counted — they would pin the gauges up for +# the lifetime of the stream (the admin Observability view itself is an SSE +# stream). Matched as a path SUBSTRING so a non-default admin path +# (pyfly.admin.path, e.g. /dashboard/api/sse/...) is excluded too. +_EXCLUDED_SUBSTRINGS = ("/api/sse/",) + +# Process-global collectors (created once per process, like the request timer). +_active: Any = None +_in_flight: Any = None +_requests: Any = None + + +def _get_server_collectors() -> tuple[Any, Any, Any]: + """Get-or-create the process-global server gauges + request counter. + + Gauges use ``multiprocess_mode="livesum"`` so that, under prometheus_client + multiprocess mode, a scrape sums the live per-worker values (the value is + harmless in single-process mode). + """ + global _active, _in_flight, _requests + if _active is None: + _active = Gauge( + _ACTIVE_METRIC, "Open ASGI connections (http + websocket)", _LABELS, multiprocess_mode="livesum" + ) + _in_flight = Gauge(_IN_FLIGHT_METRIC, "In-flight HTTP requests", _LABELS, multiprocess_mode="livesum") + _requests = Counter(_REQUESTS_METRIC, "Total HTTP requests handled at the server layer", _LABELS) + return _active, _in_flight, _requests + + +def reset_collectors() -> None: + """Unregister and drop the global collectors. Test-support only.""" + global _active, _in_flight, _requests + import contextlib + + from prometheus_client import REGISTRY + + for collector in (_active, _in_flight, _requests): + if collector is not None: + with contextlib.suppress(KeyError, ValueError): + REGISTRY.unregister(collector) + _active = None + _in_flight = None + _requests = None + + +def _server_label() -> str: + """The ``server`` label value — the configured server type, like the logs.""" + return os.environ.get("_PYFLY_SERVER_TYPE", "unknown") + + +class ServerMetricsASGIMiddleware: + """Outermost pure-ASGI middleware counting connections + in-flight requests.""" + + def __init__(self, app: Any, *, enabled: bool = True) -> None: + self.app = app + self._enabled = enabled and _HAS_PROMETHEUS + if self._enabled: + self._active, self._in_flight, self._requests = _get_server_collectors() + self._label_values = (_server_label(), str(os.getpid())) + + async def __call__(self, scope: Any, receive: Any, send: Any) -> None: + typ = scope.get("type") + if not self._enabled or typ not in ("http", "websocket"): + await self.app(scope, receive, send) + return + + path = scope.get("path", "") + if any(s in path for s in _EXCLUDED_SUBSTRINGS): + await self.app(scope, receive, send) + return + + active = self._active.labels(*self._label_values) + active.inc() + in_flight = None + if typ == "http": + in_flight = self._in_flight.labels(*self._label_values) + in_flight.inc() + try: + await self.app(scope, receive, send) + finally: + active.dec() + if in_flight is not None: + in_flight.dec() + self._requests.labels(*self._label_values).inc() diff --git a/tests/admin/test_observability_provider.py b/tests/admin/test_observability_provider.py new file mode 100644 index 00000000..34824986 --- /dev/null +++ b/tests/admin/test_observability_provider.py @@ -0,0 +1,121 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for the admin ObservabilityProvider.""" + +from __future__ import annotations + +import pytest + +from pyfly.admin.providers.observability_provider import ObservabilityProvider +from pyfly.observability import server_metrics as sm +from pyfly.web.adapters.starlette import asgi_server_metrics as asm + + +@pytest.fixture(autouse=True) +def _fresh_collectors(): + asm.reset_collectors() + sm.reset_collectors() + yield + asm.reset_collectors() + sm.reset_collectors() + + +class TestObservabilityProvider: + async def test_snapshot_shape_with_no_metrics(self) -> None: + snap = await ObservabilityProvider().get_observability() + # Shape contract holds even with nothing registered yet. + for key in ( + "timestamp", + "available", + "multiprocess", + "server", + "workers", + "uptime_seconds", + "in_flight_requests", + "requests_per_second", + "per_worker", + "lifecycle", + ): + assert key in snap + assert snap["multiprocess"] is False + assert snap["per_worker"] == [] + + async def test_reads_registered_server_metrics(self) -> None: + binder = sm.ServerMetricsBinder(server_name="uvicorn", workers=2, sample_interval=60) + await binder.start() + try: + snap = await ObservabilityProvider().get_observability() + assert snap["workers"] == 1 # one reporting worker (this process) + assert snap["uptime_seconds"] >= 0.0 + assert snap["started_total"] >= 1 + assert len(snap["per_worker"]) == 1 + assert snap["per_worker"][0]["pid"] == str(__import__("os").getpid()) + finally: + await binder.stop() + + async def test_requests_total_from_asgi_middleware(self) -> None: + from pyfly.web.adapters.starlette.asgi_server_metrics import ServerMetricsASGIMiddleware + + async def _noop(scope, receive, send): # noqa: ANN001 + pass + + mw = ServerMetricsASGIMiddleware(_noop) + await mw({"type": "http", "path": "/x"}, lambda: None, lambda m: None) + + snap = await ObservabilityProvider().get_observability() + assert snap["requests_total"] == 1 + + +class TestDisabledAndEdgeCases: + async def test_disabled_flag_reports_unavailable(self) -> None: + from types import SimpleNamespace + + ctx = SimpleNamespace( + config=SimpleNamespace(get=lambda key, default=None: "false"), + container=SimpleNamespace(_registrations={}), + ) + snap = await ObservabilityProvider(context=ctx).get_observability() + assert snap["available"] is False + + async def test_worker_row_keeps_zero_native_connections(self) -> None: + row = ObservabilityProvider._worker_row({"pid": "1", "server_native_connections": 0.0}) + # A real 0 must stay 0 (not collapse to None / "n/a"). + assert row["native_connections"] == 0 + + async def test_worker_row_none_native_connections_stays_none(self) -> None: + row = ObservabilityProvider._worker_row({"pid": "1"}) + assert row["native_connections"] is None + + async def test_snapshot_requests_per_second_is_neutral_default(self) -> None: + # The provider no longer keeps shared rps state; the stream computes it. + snap = await ObservabilityProvider().get_observability() + assert snap["requests_per_second"] == 0.0 + + +class TestObservabilityStream: + async def test_stream_emits_observability_event(self) -> None: + import json + + from pyfly.admin.api.sse import observability_stream + + gen = observability_stream(ObservabilityProvider(), interval=0.01) + try: + event = await anext(gen) + finally: + await gen.aclose() + + assert "event: observability" in event + payload = json.loads(next(line[len("data: ") :] for line in event.splitlines() if line.startswith("data: "))) + assert payload["available"] is True + assert "server" in payload diff --git a/tests/config/test_server_properties.py b/tests/config/test_server_properties.py index 11cdc190..304bda4c 100644 --- a/tests/config/test_server_properties.py +++ b/tests/config/test_server_properties.py @@ -15,7 +15,11 @@ from __future__ import annotations -from pyfly.config.properties.server import GranianProperties, ServerProperties +from pyfly.config.properties.server import ( + GranianProperties, + ServerObservabilityProperties, + ServerProperties, +) class TestServerProperties: @@ -55,3 +59,25 @@ def test_custom_values(self): def test_has_config_properties_prefix(self): assert hasattr(ServerProperties, "__pyfly_config_prefix__") assert ServerProperties.__pyfly_config_prefix__ == "pyfly.server" + + def test_observability_defaults(self): + props = ServerProperties() + assert props.observability.enabled is True + assert props.observability.sample_interval_seconds == 5.0 + assert props.observability.access_log is False + + def test_observability_custom_values(self): + props = ServerProperties( + observability=ServerObservabilityProperties(enabled=False, sample_interval_seconds=2.0, access_log=True) + ) + assert props.observability.enabled is False + assert props.observability.sample_interval_seconds == 2.0 + assert props.observability.access_log is True + + +class TestServerObservabilityProperties: + def test_defaults(self): + obs = ServerObservabilityProperties() + assert obs.enabled is True + assert obs.sample_interval_seconds == 5.0 + assert obs.access_log is False diff --git a/tests/observability/test_multiprocess.py b/tests/observability/test_multiprocess.py new file mode 100644 index 00000000..613aab63 --- /dev/null +++ b/tests/observability/test_multiprocess.py @@ -0,0 +1,95 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for prometheus multiprocess-mode support.""" + +from __future__ import annotations + +import os + +import pytest + +from pyfly.observability import multiprocess as mp + + +class TestIsMultiprocess: + def test_false_when_env_unset(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("PROMETHEUS_MULTIPROC_DIR", raising=False) + assert mp.is_multiprocess() is False + + def test_true_when_env_set(self, monkeypatch: pytest.MonkeyPatch, tmp_path) -> None: + monkeypatch.setenv("PROMETHEUS_MULTIPROC_DIR", str(tmp_path)) + assert mp.is_multiprocess() is True + + +class TestInitMultiprocessDir: + def test_single_worker_is_noop(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("PROMETHEUS_MULTIPROC_DIR", raising=False) + assert mp.init_multiprocess_dir(1) is None + assert "PROMETHEUS_MULTIPROC_DIR" not in os.environ + + def test_multi_worker_creates_and_sets_dir(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("PROMETHEUS_MULTIPROC_DIR", raising=False) + try: + path = mp.init_multiprocess_dir(4) + assert path is not None + assert os.path.isdir(path) + assert os.environ["PROMETHEUS_MULTIPROC_DIR"] == path + finally: + os.environ.pop("PROMETHEUS_MULTIPROC_DIR", None) + + def test_respects_preexisting_dir(self, monkeypatch: pytest.MonkeyPatch, tmp_path) -> None: + monkeypatch.setenv("PROMETHEUS_MULTIPROC_DIR", str(tmp_path)) + assert mp.init_multiprocess_dir(4) == str(tmp_path) + + +class TestMultiprocessAggregation: + """Cross-process aggregation. prometheus_client fixes its value class at import + time, so a metric must be created in a *fresh* process (with the env var set) + to be mmap-backed. A child process writes the file; the parent aggregates it + via ``MultiProcessCollector`` reading the shared dir — exactly the + worker→scrape path.""" + + @staticmethod + def _write_metric_in_child(multiproc_dir: str, metric: str, value: int) -> None: + import subprocess + import sys + + env = dict(os.environ) + env["PROMETHEUS_MULTIPROC_DIR"] = multiproc_dir + script = f"from prometheus_client import Counter\nCounter({metric!r}, 'x').inc({value})\n" + subprocess.run([sys.executable, "-c", script], check=True, env=env) + + def test_registry_aggregates_metric_written_by_another_process( + self, monkeypatch: pytest.MonkeyPatch, tmp_path + ) -> None: + monkeypatch.setenv("PROMETHEUS_MULTIPROC_DIR", str(tmp_path)) + self._write_metric_in_child(str(tmp_path), "pyfly_mp_test_events", 3) + + from prometheus_client import generate_latest + + exposition = generate_latest(mp.build_multiprocess_registry()).decode() + assert "pyfly_mp_test_events_total" in exposition + assert "3.0" in exposition + + async def test_prometheus_endpoint_uses_multiprocess_registry( + self, monkeypatch: pytest.MonkeyPatch, tmp_path + ) -> None: + monkeypatch.setenv("PROMETHEUS_MULTIPROC_DIR", str(tmp_path)) + self._write_metric_in_child(str(tmp_path), "pyfly_mp_endpoint_events", 1) + + from pyfly.actuator.endpoints.prometheus_endpoint import PrometheusEndpoint + + result = await PrometheusEndpoint().handle() + assert result.get("status") != 503 + assert "pyfly_mp_endpoint_events_total" in result["body"] diff --git a/tests/observability/test_server_metrics.py b/tests/observability/test_server_metrics.py new file mode 100644 index 00000000..8ee5891b --- /dev/null +++ b/tests/observability/test_server_metrics.py @@ -0,0 +1,158 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for ServerMetricsBinder.""" + +from __future__ import annotations + +import asyncio + +import pytest +from prometheus_client import REGISTRY, generate_latest + +from pyfly.observability import server_metrics as sm +from pyfly.observability.server_metrics import ServerMetricsBinder, resolve_worker_count +from pyfly.server.ports.server_stats import ServerStats + + +@pytest.fixture(autouse=True) +def _fresh_collectors(): + sm.reset_collectors() + yield + sm.reset_collectors() + + +class _StubStatsPort: + def __init__(self, active: int | None) -> None: + self._active = active + + def sample(self) -> ServerStats: + return ServerStats(workers=2, server_uptime_seconds=1.0, worker_pid=1, active_connections=self._active) + + def on_serve_start(self) -> None: # pragma: no cover - not exercised here + pass + + def on_serve_stop(self) -> None: # pragma: no cover - not exercised here + pass + + +class TestServerMetricsBinder: + async def test_start_registers_meters(self) -> None: + binder = ServerMetricsBinder(server_name="uvicorn", workers=3, sample_interval=60) + await binder.start() + try: + exposition = generate_latest(REGISTRY).decode() + assert "server_workers" in exposition + assert "server_uptime_seconds" in exposition + assert "server_started_total" in exposition + assert 'server="uvicorn"' in exposition + workers_value = sm._get_binder_collectors()["workers"].labels(*binder._labels)._value.get() + assert workers_value == 3.0 + finally: + await binder.stop() + + async def test_uptime_advances(self) -> None: + binder = ServerMetricsBinder(server_name="uvicorn", workers=1, sample_interval=60) + await binder.start() + try: + await asyncio.sleep(0.02) + await binder._refresh() + value = sm._get_binder_collectors()["uptime"].labels(*binder._labels)._value.get() + assert value > 0.0 + finally: + await binder.stop() + + async def test_stop_marks_stopped_and_cancels_task(self) -> None: + binder = ServerMetricsBinder(server_name="uvicorn", workers=1, sample_interval=0.01) + await binder.start() + await binder.stop() + assert binder._task is None + value = sm._get_binder_collectors()["stopped"].labels(*binder._labels)._value.get() + assert value == 1.0 + + async def test_native_connections_set_when_sample_provides(self) -> None: + binder = ServerMetricsBinder( + server_name="uvicorn", workers=1, stats_port=_StubStatsPort(active=11), sample_interval=60 + ) + await binder.start() + try: + value = sm._get_binder_collectors()["native_conns"].labels(*binder._labels)._value.get() + assert value == 11.0 + finally: + await binder.stop() + + async def test_native_connections_untouched_when_sample_none(self) -> None: + binder = ServerMetricsBinder( + server_name="granian", workers=1, stats_port=_StubStatsPort(active=None), sample_interval=60 + ) + await binder.start() + try: + value = sm._get_binder_collectors()["native_conns"].labels(*binder._labels)._value.get() + assert value == 0.0 + finally: + await binder.stop() + + async def test_stop_is_resilient_to_a_dead_sampling_task(self) -> None: + # A sampling task that ended with a non-cancellation exception must NOT + # prevent stop() from recording server_stopped_total or raise out of it. + binder = ServerMetricsBinder(server_name="uvicorn", workers=1, sample_interval=60) + await binder.start() + binder._task.cancel() + with __import__("contextlib").suppress(BaseException): + await binder._task + + async def _boom() -> None: + raise ValueError("simulated dead sampler") + + failed = asyncio.ensure_future(_boom()) + with __import__("contextlib").suppress(BaseException): + await failed + binder._task = failed # type: ignore[assignment] + + await binder.stop() # must not raise + assert sm._get_binder_collectors()["stopped"].labels(*binder._labels)._value.get() == 1.0 + + async def test_refresh_never_raises_when_gauge_write_fails(self, monkeypatch: pytest.MonkeyPatch) -> None: + binder = ServerMetricsBinder(server_name="uvicorn", workers=1, sample_interval=60) + await binder.start() + try: + # Force a gauge write to blow up; _refresh must swallow it. + def _boom(*_a, **_k): + raise OSError("mmap full") + + monkeypatch.setattr(sm._get_binder_collectors()["uptime"], "labels", _boom) + await binder._refresh() # must not raise + finally: + await binder.stop() + + async def test_double_start_does_not_raise_duplicated_timeseries(self) -> None: + b1 = ServerMetricsBinder(server_name="uvicorn", workers=1, sample_interval=60) + b2 = ServerMetricsBinder(server_name="uvicorn", workers=1, sample_interval=60) + await b1.start() + await b2.start() # must reuse the process-global collectors, not re-register + await b1.stop() + await b2.stop() + + +class TestResolveWorkerCount: + def test_reads_env(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("_PYFLY_WORKERS", "4") + assert resolve_worker_count() == 4 + + def test_fallback_when_unset(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("_PYFLY_WORKERS", raising=False) + assert resolve_worker_count(fallback=2) == 2 + + def test_bad_value_falls_back(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("_PYFLY_WORKERS", "not-an-int") + assert resolve_worker_count(fallback=1) == 1 diff --git a/tests/server/test_server_observability_e2e.py b/tests/server/test_server_observability_e2e.py new file mode 100644 index 00000000..12a92102 --- /dev/null +++ b/tests/server/test_server_observability_e2e.py @@ -0,0 +1,136 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""End-to-end: real uvicorn server + live server-observability metrics. + +Boots the app on an ephemeral port with ``UvicornServerAdapter.serve_async`` (the +in-process embedding path), fires real HTTP requests through it, and asserts the +``server_*`` meters move and are served in the Prometheus exposition. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import socket +from typing import Any + +import httpx +import pytest +from prometheus_client import REGISTRY, generate_latest +from starlette.responses import JSONResponse +from starlette.routing import Route + +from pyfly.context.application_context import ApplicationContext +from pyfly.core.config import Config +from pyfly.observability import server_metrics as sm +from pyfly.server.adapters.uvicorn.adapter import UvicornServerAdapter +from pyfly.web.adapters.starlette import asgi_server_metrics as asm +from pyfly.web.adapters.starlette.app import create_app + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture(autouse=True) +def _fresh_collectors(): + asm.reset_collectors() + sm.reset_collectors() + yield + asm.reset_collectors() + sm.reset_collectors() + + +def _free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return int(s.getsockname()[1]) + + +async def _wait_ready(port: int, path: str, attempts: int = 50) -> None: + async with httpx.AsyncClient() as client: + for _ in range(attempts): + with contextlib.suppress(Exception): + r = await client.get(f"http://127.0.0.1:{port}{path}", timeout=1.0) + if r.status_code == 200: + return + await asyncio.sleep(0.05) + raise AssertionError(f"server never became ready on :{port}{path}") + + +async def _hello(request: Any) -> JSONResponse: + return JSONResponse({"ok": True}) + + +async def test_server_metrics_move_under_real_traffic() -> None: + port = _free_port() + ctx = ApplicationContext( + Config({"pyfly": {"server": {"observability": {"enabled": True, "sample-interval-seconds": 0.05}}}}) + ) + + @contextlib.asynccontextmanager + async def _lifespan(app: Any): + await ctx.start() + try: + yield + finally: + await ctx.stop() + + app = create_app(context=ctx, docs_enabled=False, extra_routes=[Route("/hello", _hello)], lifespan=_lifespan) + + from pyfly.config.properties.server import ServerProperties + + config = ServerProperties() + config.host = "127.0.0.1" + config.port = port + + adapter = UvicornServerAdapter() + task = asyncio.create_task(adapter.serve_async(app, config)) + try: + await _wait_ready(port, "/hello") + async with httpx.AsyncClient() as client: + for _ in range(5): + r = await client.get(f"http://127.0.0.1:{port}/hello", timeout=2.0) + assert r.status_code == 200 + + # Give the binder's sampling task a tick to publish uptime/workers. + await asyncio.sleep(0.1) + + exposition = generate_latest(REGISTRY).decode() + assert "server_requests_total" in exposition + assert "server_active_connections" in exposition + assert "server_in_flight_requests" in exposition + assert "server_workers" in exposition + assert "server_uptime_seconds" in exposition + + # server_requests_total must reflect the real traffic we sent. + total = sum( + sample.value + for metric in REGISTRY.collect() + if metric.name == "server_requests" + for sample in metric.samples + if sample.name == "server_requests_total" + ) + assert total >= 5.0 + + # The /actuator/prometheus exposition path serves the server_* meters. + from pyfly.actuator.endpoints.prometheus_endpoint import PrometheusEndpoint + + body = (await PrometheusEndpoint().handle())["body"] + assert "server_requests_total" in body + assert "server_workers" in body + finally: + adapter.shutdown() + with contextlib.suppress(TimeoutError, asyncio.CancelledError): + await asyncio.wait_for(task, timeout=8.0) + if not task.done(): + task.cancel() diff --git a/tests/server/test_server_stats_port.py b/tests/server/test_server_stats_port.py new file mode 100644 index 00000000..07e89064 --- /dev/null +++ b/tests/server/test_server_stats_port.py @@ -0,0 +1,93 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for the ServerStatsPort protocol + ServerStats dataclass.""" + +from __future__ import annotations + +import os + +import pytest + +from pyfly.server.ports.server_stats import ServerStats, ServerStatsPort + + +class _FakeStatsServer: + """Duck-typed ServerStatsPort.""" + + def sample(self) -> ServerStats | None: + return ServerStats(workers=1, server_uptime_seconds=1.0, worker_pid=os.getpid()) + + def on_serve_start(self) -> None: + pass + + def on_serve_stop(self) -> None: + pass + + +class _NotAStatsServer: + def sample(self) -> None: # missing on_serve_start/on_serve_stop + return None + + +class TestServerStatsPort: + def test_duck_typed_class_is_stats_port(self) -> None: + assert isinstance(_FakeStatsServer(), ServerStatsPort) + + def test_incomplete_class_is_not_stats_port(self) -> None: + assert not isinstance(_NotAStatsServer(), ServerStatsPort) + + +class TestServerStats: + def test_optional_connection_fields_default_to_none(self) -> None: + stats = ServerStats(workers=4, server_uptime_seconds=12.5, worker_pid=123) + assert stats.active_connections is None + assert stats.total_requests is None + assert stats.workers == 4 + + def test_is_frozen(self) -> None: + stats = ServerStats(workers=1, server_uptime_seconds=0.0, worker_pid=1) + with pytest.raises(AttributeError): + stats.workers = 2 # type: ignore[misc] + + def test_populated_connection_fields(self) -> None: + stats = ServerStats(workers=1, server_uptime_seconds=3.0, worker_pid=1, active_connections=7, total_requests=42) + assert stats.active_connections == 7 + assert stats.total_requests == 42 + + +class TestGranianAdapterStats: + """Granian/Hypercorn adapters import without their server packages, so these + run regardless of what is installed.""" + + def test_granian_implements_port_and_samples_workers_only(self) -> None: + from pyfly.server.adapters.granian.adapter import GranianServerAdapter + + adapter = GranianServerAdapter() + assert isinstance(adapter, ServerStatsPort) + adapter.on_serve_start() + stats = adapter.sample() + assert stats.active_connections is None + assert stats.total_requests is None + assert stats.server_uptime_seconds >= 0.0 + + def test_hypercorn_implements_port_and_samples_workers_only(self) -> None: + from pyfly.server.adapters.hypercorn.adapter import HypercornServerAdapter + + adapter = HypercornServerAdapter() + assert isinstance(adapter, ServerStatsPort) + adapter.on_serve_start() + stats = adapter.sample() + assert stats.active_connections is None + assert stats.total_requests is None + assert stats.server_uptime_seconds >= 0.0 diff --git a/tests/server/test_uvicorn_adapter.py b/tests/server/test_uvicorn_adapter.py index ab2df451..f39c25f7 100644 --- a/tests/server/test_uvicorn_adapter.py +++ b/tests/server/test_uvicorn_adapter.py @@ -15,11 +15,13 @@ from __future__ import annotations +from types import SimpleNamespace from unittest.mock import patch from pyfly.config.properties.server import ServerProperties from pyfly.server.adapters.uvicorn.adapter import UvicornServerAdapter from pyfly.server.ports.outbound import ApplicationServerPort +from pyfly.server.ports.server_stats import ServerStats, ServerStatsPort from pyfly.server.types import ServerInfo @@ -47,3 +49,43 @@ def test_serve_calls_uvicorn_run(self, mock_uvicorn): def test_shutdown_does_not_raise(self): adapter = UvicornServerAdapter() adapter.shutdown() + + +class TestUvicornServerStats: + def test_implements_server_stats_port(self): + assert isinstance(UvicornServerAdapter(), ServerStatsPort) + + def test_sample_without_server_returns_none_connection_fields(self): + adapter = UvicornServerAdapter() + stats = adapter.sample() + assert isinstance(stats, ServerStats) + assert stats.active_connections is None + assert stats.total_requests is None + assert stats.workers == 1 + + def test_on_serve_start_makes_uptime_advance(self): + adapter = UvicornServerAdapter() + adapter.on_serve_start() + stats = adapter.sample() + assert stats.server_uptime_seconds >= 0.0 + + def test_sample_reads_server_state_when_present(self): + adapter = UvicornServerAdapter() + adapter._server = SimpleNamespace(server_state=SimpleNamespace(total_requests=5, connections={1, 2, 3})) + adapter._info = ServerInfo( + name="uvicorn", version="x", workers=4, event_loop="asyncio", http_protocol="h1", host="0.0.0.0", port=8000 + ) + stats = adapter.sample() + assert stats.active_connections == 3 + assert stats.total_requests == 5 + assert stats.workers == 4 + + def test_on_serve_stop_clears_global_active_server(self): + from pyfly.server.adapters.uvicorn import adapter as mod + + adapter = UvicornServerAdapter() + sentinel = SimpleNamespace(server_state=SimpleNamespace(total_requests=0, connections=set())) + adapter._server = sentinel + mod._active_server = sentinel + adapter.on_serve_stop() + assert mod._active_server is None diff --git a/tests/web/test_asgi_server_metrics.py b/tests/web/test_asgi_server_metrics.py new file mode 100644 index 00000000..cd1c2192 --- /dev/null +++ b/tests/web/test_asgi_server_metrics.py @@ -0,0 +1,97 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for the pure-ASGI server-metrics middleware.""" + +from __future__ import annotations + +import pytest +from prometheus_client import REGISTRY, generate_latest + +from pyfly.web.adapters.starlette import asgi_server_metrics as asm +from pyfly.web.adapters.starlette.asgi_server_metrics import ServerMetricsASGIMiddleware + + +@pytest.fixture(autouse=True) +def _fresh_collectors(): + asm.reset_collectors() + yield + asm.reset_collectors() + + +def _labels() -> tuple[str, str]: + import os + + return (asm._server_label(), str(os.getpid())) + + +async def _noop_app(scope, receive, send): # noqa: ANN001 + pass + + +async def _drive_http(mw: ServerMetricsASGIMiddleware, path: str = "/api/x") -> None: + await mw({"type": "http", "path": path}, lambda: None, lambda m: None) + + +class TestServerMetricsMiddleware: + async def test_counts_completed_http_request(self) -> None: + mw = ServerMetricsASGIMiddleware(_noop_app) + await _drive_http(mw) + # in-flight returns to zero; one request counted. + assert mw._in_flight.labels(*_labels())._value.get() == 0.0 + assert mw._requests.labels(*_labels())._value.get() == 1.0 + exposition = generate_latest(REGISTRY).decode() + assert "server_requests_total" in exposition + assert "server_in_flight_requests" in exposition + assert "server_active_connections" in exposition + + async def test_in_flight_decrements_even_on_error(self) -> None: + async def boom(scope, receive, send): # noqa: ANN001 + raise RuntimeError("kaboom") + + mw = ServerMetricsASGIMiddleware(boom) + with pytest.raises(RuntimeError): + await _drive_http(mw) + assert mw._in_flight.labels(*_labels())._value.get() == 0.0 + assert mw._active.labels(*_labels())._value.get() == 0.0 + # a failed request still completed an http scope -> counted. + assert mw._requests.labels(*_labels())._value.get() == 1.0 + + async def test_websocket_counts_active_not_in_flight(self) -> None: + mw = ServerMetricsASGIMiddleware(_noop_app) + await mw({"type": "websocket", "path": "/ws"}, lambda: None, lambda m: None) + # websocket touches active-connections but not the http in-flight/requests. + assert mw._active.labels(*_labels())._value.get() == 0.0 # back to 0 after close + assert mw._in_flight.labels(*_labels())._value.get() == 0.0 + assert mw._requests.labels(*_labels())._value.get() == 0.0 + + async def test_lifespan_scope_passes_through_untouched(self) -> None: + seen = {} + + async def app(scope, receive, send): # noqa: ANN001 + seen["type"] = scope["type"] + + mw = ServerMetricsASGIMiddleware(app) + await mw({"type": "lifespan"}, lambda: None, lambda m: None) + assert seen["type"] == "lifespan" + + async def test_excluded_sse_path_not_counted(self) -> None: + mw = ServerMetricsASGIMiddleware(_noop_app) + await _drive_http(mw, path="/admin/api/sse/observability") + assert mw._requests.labels(*_labels())._value.get() == 0.0 + + async def test_disabled_middleware_is_passthrough(self) -> None: + mw = ServerMetricsASGIMiddleware(_noop_app, enabled=False) + await _drive_http(mw) + # No collectors created/incremented when disabled. + assert asm._requests is None or mw._requests.labels(*_labels())._value.get() == 0.0 diff --git a/tests/web/test_server_observability_integration.py b/tests/web/test_server_observability_integration.py new file mode 100644 index 00000000..58d4d12c --- /dev/null +++ b/tests/web/test_server_observability_integration.py @@ -0,0 +1,113 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Integration: server observability wired into create_app's lifespan.""" + +from __future__ import annotations + +import contextlib +from typing import Any + +import pytest +from prometheus_client import REGISTRY, generate_latest +from starlette.responses import JSONResponse +from starlette.routing import Route +from starlette.testclient import TestClient + +from pyfly.context.application_context import ApplicationContext +from pyfly.core.config import Config +from pyfly.observability import server_metrics as sm +from pyfly.web.adapters.starlette import asgi_server_metrics as asm +from pyfly.web.adapters.starlette.app import create_app + + +@pytest.fixture(autouse=True) +def _reset_collectors(): + asm.reset_collectors() + sm.reset_collectors() + yield + asm.reset_collectors() + sm.reset_collectors() + + +async def _hello(request: Any) -> JSONResponse: + return JSONResponse({"ok": True}) + + +def _make_app(*, enabled: bool) -> tuple[Any, ApplicationContext]: + ctx = ApplicationContext( + Config( + { + "pyfly": { + "server": {"observability": {"enabled": enabled, "sample-interval-seconds": 60}}, + } + } + ) + ) + + @contextlib.asynccontextmanager + async def _lifespan(app: Any): + await ctx.start() + try: + yield + finally: + await ctx.stop() + + app = create_app( + context=ctx, + docs_enabled=False, + extra_routes=[Route("/hello", _hello)], + lifespan=_lifespan, + ) + return app, ctx + + +class TestServerObservabilityIntegration: + def test_enabled_registers_and_exposes_server_metrics(self) -> None: + app, _ctx = _make_app(enabled=True) + with TestClient(app) as client: + assert client.get("/hello").status_code == 200 + + exposition = generate_latest(REGISTRY).decode() + # Binder meters (worker/uptime/lifecycle). + assert "server_workers" in exposition + assert "server_uptime_seconds" in exposition + assert "server_started_total" in exposition + # ASGI middleware meters (connections/in-flight/requests). + assert "server_active_connections" in exposition + assert "server_in_flight_requests" in exposition + assert "server_requests_total" in exposition + + def test_request_increments_server_requests_total(self) -> None: + app, _ctx = _make_app(enabled=True) + with TestClient(app) as client: + client.get("/hello") + client.get("/hello") + # The pure-ASGI middleware counted both completed http scopes. + total = sum( + sample.value + for metric in REGISTRY.collect() + if metric.name == "server_requests" + for sample in metric.samples + if sample.name == "server_requests_total" + ) + assert total >= 2.0 + + def test_disabled_does_not_register_server_metrics(self) -> None: + app, _ctx = _make_app(enabled=False) + with TestClient(app) as client: + client.get("/hello") + + exposition = generate_latest(REGISTRY).decode() + assert "server_workers" not in exposition + assert "server_active_connections" not in exposition