Skip to content

Commit 2e85d01

Browse files
committed
feat(gateway): add readiness probe metrics and test-only store close
Emit Prometheus readiness metrics for database probes (healthy gauge and outcome-labeled latency histogram) with coverage in health HTTP tests. Restrict Store::close behind test support cfg to prevent accidental runtime pool shutdown under live traffic. Signed-off-by: Adrien Langou <alangou@nvidia.com>
1 parent fafde3e commit 2e85d01

16 files changed

Lines changed: 764 additions & 34 deletions

File tree

architecture/gateway.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ health, metrics, or tunnel routes. The plaintext service router also rejects
3737
browser requests whose Fetch Metadata, Origin, or Referer headers indicate a
3838
cross-origin or sibling-subdomain request.
3939

40+
Dedicated health listeners expose `/healthz` (process liveness only) and
41+
`/readyz` (dependency-aware readiness). Readiness reflects the latest result
42+
of an in-process background task that pings the persistence layer on a
43+
fixed cadence; the handler reads a cached state, so responses are
44+
sub-millisecond and never race the kubelet probe timeout.
45+
4046
Supported auth modes:
4147

4248
| Mode | Use |

crates/openshell-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ x509-parser = "0.16"
9595

9696
[features]
9797
dev-settings = ["openshell-core/dev-settings"]
98+
test-support = []
9899

99100
[dev-dependencies]
100101
hyper-rustls = { version = "0.27", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "ring", "webpki-tokio"] }

crates/openshell-server/src/http.rs

Lines changed: 273 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,19 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
//! HTTP health endpoints using Axum.
5+
//!
6+
//! Three endpoints with distinct semantics:
7+
//! - `/healthz` — Kubernetes liveness probe. Returns `200 OK` whenever the
8+
//! process is responsive. Intentionally does NOT depend on the database
9+
//! so a transient outage does not cascade into a `CrashLoopBackOff`.
10+
//! - `/readyz` — Kubernetes readiness probe. Reads the cached state
11+
//! published by [`crate::readiness::DatabaseHealthMonitor`] and returns
12+
//! `503 Service Unavailable` when the latest background check failed.
13+
//! Handler latency is sub-millisecond: the database is never pinged from
14+
//! inside the request path, so the response cannot race the kubelet's
15+
//! probe timeout.
16+
//! - `/health` — Alias of `/readyz` for external monitors
17+
//! that conventionally probe `/health`.
518
619
use axum::{
720
Json, Router,
@@ -14,43 +27,143 @@ use axum::{
1427
use metrics_exporter_prometheus::PrometheusHandle;
1528
use serde::Serialize;
1629
use std::sync::Arc;
30+
use tokio::sync::watch;
1731

18-
/// Health check response.
32+
use crate::persistence::Store;
33+
use crate::readiness::{DatabaseHealthMonitor, HealthError, HealthState};
34+
35+
const STATUS_HEALTHY: &str = "healthy";
36+
const STATUS_UNHEALTHY: &str = "unhealthy";
37+
const DATABASE_INITIALIZING_ERROR: &str = "readiness monitor still initializing";
38+
const DATABASE_UNAVAILABLE_ERROR: &str = "database unavailable";
39+
const DATABASE_TIMEOUT_ERROR: &str = "database health check timed out";
40+
41+
#[derive(Clone)]
42+
struct HealthRouterState {
43+
health: watch::Receiver<HealthState>,
44+
}
45+
46+
/// Per-dependency check entry exposed under `checks` in the JSON payload.
47+
#[derive(Debug, Serialize)]
48+
pub struct DependencyCheck {
49+
/// `"healthy"` or `"unhealthy"`.
50+
pub status: &'static str,
51+
52+
/// Wall-clock time of the latest background ping, when measurable.
53+
#[serde(skip_serializing_if = "Option::is_none")]
54+
pub latency_ms: Option<u64>,
55+
56+
/// Failure detail. Absent on success.
57+
#[serde(skip_serializing_if = "Option::is_none")]
58+
pub error: Option<String>,
59+
}
60+
61+
/// Aggregated dependency results.
62+
#[derive(Debug, Serialize)]
63+
pub struct HealthChecks {
64+
pub database: DependencyCheck,
65+
}
66+
67+
/// Readiness response payload.
1968
#[derive(Debug, Serialize)]
2069
pub struct HealthResponse {
21-
/// Service status.
70+
/// Overall status: `"healthy"` if every dependency is healthy.
2271
pub status: &'static str,
2372

2473
/// Service version.
2574
pub version: &'static str,
26-
}
2775

28-
/// Simple health check - returns 200 OK.
29-
async fn health() -> impl IntoResponse {
30-
StatusCode::OK
76+
/// Per-dependency breakdown.
77+
pub checks: HealthChecks,
3178
}
3279

33-
/// Kubernetes liveness probe.
80+
/// Kubernetes liveness probe — process responsiveness only.
3481
async fn healthz() -> impl IntoResponse {
3582
StatusCode::OK
3683
}
3784

38-
/// Kubernetes readiness probe with detailed status.
39-
async fn readyz() -> impl IntoResponse {
85+
/// Kubernetes readiness probe — reflects the cached background DB state.
86+
async fn readyz(State(state): State<Arc<HealthRouterState>>) -> impl IntoResponse {
87+
render_response(&state.health.borrow())
88+
}
89+
90+
/// Convenience alias of [`readyz`] for monitors that probe `/health`.
91+
async fn health(State(state): State<Arc<HealthRouterState>>) -> impl IntoResponse {
92+
render_response(&state.health.borrow())
93+
}
94+
95+
fn render_response(state: &HealthState) -> (StatusCode, Json<HealthResponse>) {
96+
let database = render_database(state);
97+
let healthy = state.is_healthy();
4098
let response = HealthResponse {
41-
status: "healthy",
99+
status: if healthy {
100+
STATUS_HEALTHY
101+
} else {
102+
STATUS_UNHEALTHY
103+
},
42104
version: openshell_core::VERSION,
105+
checks: HealthChecks { database },
106+
};
107+
let code = if healthy {
108+
StatusCode::OK
109+
} else {
110+
StatusCode::SERVICE_UNAVAILABLE
43111
};
112+
(code, Json(response))
113+
}
114+
115+
fn render_database(state: &HealthState) -> DependencyCheck {
116+
match state {
117+
HealthState::Initializing => DependencyCheck {
118+
status: STATUS_UNHEALTHY,
119+
latency_ms: None,
120+
error: Some(DATABASE_INITIALIZING_ERROR.to_string()),
121+
},
122+
HealthState::Healthy { latency_ms } => DependencyCheck {
123+
status: STATUS_HEALTHY,
124+
latency_ms: Some(*latency_ms),
125+
error: None,
126+
},
127+
HealthState::Unhealthy(HealthError::Unavailable { latency_ms }) => DependencyCheck {
128+
status: STATUS_UNHEALTHY,
129+
latency_ms: Some(*latency_ms),
130+
error: Some(DATABASE_UNAVAILABLE_ERROR.to_string()),
131+
},
132+
HealthState::Unhealthy(HealthError::Timeout) => DependencyCheck {
133+
status: STATUS_UNHEALTHY,
134+
latency_ms: None,
135+
error: Some(DATABASE_TIMEOUT_ERROR.to_string()),
136+
},
137+
}
138+
}
44139

45-
(StatusCode::OK, Json(response))
140+
/// Build the health router by spawning a background [`DatabaseHealthMonitor`]
141+
/// for `store` and wiring its receiver into the handlers.
142+
///
143+
/// Returns immediately so the listener is responsive from t=0. The router's
144+
/// initial state is [`HealthState::Initializing`] — `/readyz` and `/health`
145+
/// will return `503` with a structured `{"checks": {"database": {"status":
146+
/// "initializing"}}}` payload until the background monitor publishes its
147+
/// first real probe outcome (within one [`crate::readiness::DEFAULT_CHECK_INTERVAL`]).
148+
/// The background task continues running detached for the remainder of the
149+
/// runtime.
150+
pub fn health_router(store: Arc<Store>) -> Router {
151+
let monitor = DatabaseHealthMonitor::spawn(store);
152+
health_router_from_receiver(monitor.subscribe())
46153
}
47154

48-
/// Create the health router.
49-
pub fn health_router() -> Router {
155+
/// Build the health router from an existing monitor receiver.
156+
///
157+
/// Crate-internal: used by [`health_router`] and by tests that drive the
158+
/// `HealthState` directly without spinning up the polling task.
159+
pub fn health_router_from_receiver(receiver: watch::Receiver<HealthState>) -> Router {
160+
let state = Arc::new(HealthRouterState { health: receiver });
161+
50162
Router::new()
51163
.route("/health", get(health))
52164
.route("/healthz", get(healthz))
53165
.route("/readyz", get(readyz))
166+
.with_state(state)
54167
}
55168

56169
/// Create the metrics router for the dedicated metrics port.
@@ -64,7 +177,7 @@ async fn render_metrics(State(handle): State<PrometheusHandle>) -> impl IntoResp
64177
handle.render()
65178
}
66179

67-
/// Create the HTTP router.
180+
/// Create the HTTP router served on the multiplexed gateway port.
68181
pub fn http_router(state: Arc<crate::ServerState>) -> Router {
69182
crate::ws_tunnel::router(state.clone())
70183
.merge(crate::auth::router(state.clone()))
@@ -305,3 +418,149 @@ mod tests {
305418
assert!(!browser_context_allows_plaintext_service_request(&req));
306419
}
307420
}
421+
422+
#[cfg(test)]
423+
mod readiness_tests {
424+
use super::*;
425+
use axum::body::Body;
426+
use http::Request;
427+
use http_body_util::BodyExt;
428+
use tower::ServiceExt;
429+
430+
async fn in_memory_store() -> Arc<Store> {
431+
Arc::new(
432+
Store::connect("sqlite::memory:")
433+
.await
434+
.expect("connect in-memory sqlite store"),
435+
)
436+
}
437+
438+
/// Build a [`health_router`] that has already observed its first probe
439+
/// outcome. Test-only — production code must not block the listener on
440+
/// the first poll (see [`health_router`]).
441+
async fn polled_health_router(store: Arc<Store>) -> Router {
442+
let mut monitor = DatabaseHealthMonitor::spawn(store);
443+
monitor.wait_until_polled().await;
444+
health_router_from_receiver(monitor.subscribe())
445+
}
446+
447+
async fn get(router: Router, path: &str) -> (StatusCode, serde_json::Value) {
448+
let response = router
449+
.oneshot(Request::get(path).body(Body::empty()).unwrap())
450+
.await
451+
.expect("router responds");
452+
let status = response.status();
453+
let bytes = response
454+
.into_body()
455+
.collect()
456+
.await
457+
.expect("collect body")
458+
.to_bytes();
459+
let body = if bytes.is_empty() {
460+
serde_json::Value::Null
461+
} else {
462+
serde_json::from_slice(&bytes).expect("response is valid JSON")
463+
};
464+
(status, body)
465+
}
466+
467+
/// Build a router whose state is driven by a `HealthState` we control,
468+
/// so each handler-shape test can pin the exact mapping under test.
469+
fn router_with_state(state: HealthState) -> Router {
470+
let (_tx, rx) = watch::channel(state);
471+
health_router_from_receiver(rx)
472+
}
473+
474+
#[tokio::test]
475+
async fn healthz_is_minimal_and_does_not_touch_the_database() {
476+
// Liveness must succeed even when the database is unreachable —
477+
// otherwise a transient outage would CrashLoopBackOff the gateway.
478+
let store = in_memory_store().await;
479+
store.close().await;
480+
let (status, body) = get(health_router(store), "/healthz").await;
481+
assert_eq!(status, StatusCode::OK);
482+
assert!(body.is_null(), "healthz must return an empty body");
483+
}
484+
485+
#[tokio::test]
486+
async fn readyz_returns_200_with_healthy_payload_when_db_is_reachable() {
487+
let store = in_memory_store().await;
488+
let (status, body) = get(polled_health_router(store).await, "/readyz").await;
489+
assert_eq!(status, StatusCode::OK);
490+
assert_eq!(body["status"], "healthy");
491+
assert_eq!(body["checks"]["database"]["status"], "healthy");
492+
assert!(
493+
body["checks"]["database"]["latency_ms"].is_number(),
494+
"expected latency_ms in healthy payload"
495+
);
496+
assert!(
497+
body["checks"]["database"]["error"].is_null(),
498+
"healthy payload must omit the error field"
499+
);
500+
}
501+
502+
#[tokio::test]
503+
async fn health_alias_mirrors_readyz_when_db_is_reachable() {
504+
let store = in_memory_store().await;
505+
let (status, body) = get(polled_health_router(store).await, "/health").await;
506+
assert_eq!(status, StatusCode::OK);
507+
assert_eq!(body["status"], "healthy");
508+
assert_eq!(body["checks"]["database"]["status"], "healthy");
509+
}
510+
511+
#[tokio::test]
512+
async fn readyz_returns_503_with_unhealthy_payload_when_db_is_unreachable() {
513+
let store = in_memory_store().await;
514+
store.close().await;
515+
let (status, body) = get(polled_health_router(store).await, "/readyz").await;
516+
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
517+
assert_eq!(body["status"], "unhealthy");
518+
assert_eq!(body["checks"]["database"]["status"], "unhealthy");
519+
assert_eq!(
520+
body["checks"]["database"]["error"],
521+
DATABASE_UNAVAILABLE_ERROR
522+
);
523+
}
524+
525+
#[tokio::test]
526+
async fn health_alias_returns_503_when_db_is_unreachable() {
527+
let store = in_memory_store().await;
528+
store.close().await;
529+
let (status, body) = get(polled_health_router(store).await, "/health").await;
530+
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
531+
assert_eq!(body["status"], "unhealthy");
532+
assert_eq!(body["checks"]["database"]["status"], "unhealthy");
533+
}
534+
535+
#[tokio::test]
536+
async fn readyz_reports_initializing_state_as_unhealthy_with_explicit_reason() {
537+
let (status, body) = get(router_with_state(HealthState::Initializing), "/readyz").await;
538+
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
539+
assert_eq!(body["status"], "unhealthy");
540+
assert_eq!(body["checks"]["database"]["status"], "unhealthy");
541+
assert_eq!(
542+
body["checks"]["database"]["error"],
543+
DATABASE_INITIALIZING_ERROR
544+
);
545+
assert!(
546+
body["checks"]["database"]["latency_ms"].is_null(),
547+
"initializing state has no latency to report yet"
548+
);
549+
}
550+
551+
#[tokio::test]
552+
async fn readyz_renders_timeout_state_with_dedicated_error_string() {
553+
let (status, body) = get(
554+
router_with_state(HealthState::Unhealthy(HealthError::Timeout)),
555+
"/readyz",
556+
)
557+
.await;
558+
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
559+
assert_eq!(body["status"], "unhealthy");
560+
assert_eq!(body["checks"]["database"]["error"], DATABASE_TIMEOUT_ERROR);
561+
assert!(
562+
body["checks"]["database"]["latency_ms"].is_null(),
563+
"timeout state has no completed-call latency"
564+
);
565+
}
566+
}

crates/openshell-server/src/lib.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ mod multiplex;
3232
mod persistence;
3333
pub(crate) mod policy_store;
3434
mod provider_refresh;
35+
mod readiness;
3536
mod sandbox_index;
3637
mod sandbox_watch;
3738
mod service_routing;
@@ -63,7 +64,7 @@ pub use grpc::OpenShellService;
6364
pub use http::{health_router, http_router, metrics_router, service_http_router};
6465
pub use multiplex::{MultiplexService, MultiplexedService};
6566
use openshell_driver_kubernetes::KubernetesComputeConfig;
66-
use persistence::Store;
67+
pub use persistence::Store;
6768
use sandbox_index::SandboxIndex;
6869
use sandbox_watch::SandboxWatchBus;
6970
pub use tls::TlsAcceptor;
@@ -196,6 +197,7 @@ pub async fn run_server(
196197
if database_url.is_empty() {
197198
return Err(Error::config("database_url is required"));
198199
}
200+
199201
let store = Arc::new(Store::connect(database_url).await?);
200202

201203
let oidc_cache = if let Some(ref oidc) = config.oidc {
@@ -368,9 +370,12 @@ pub async fn run_server(
368370
))
369371
})?;
370372
info!(address = %health_bind_address, "Health server listening");
373+
// `health_router` returns immediately; the listener serves
374+
// `Initializing → 503` until the background monitor publishes the
375+
// first real probe outcome, so the endpoint is always responsive.
376+
let router = health_router(store.clone());
371377
tokio::spawn(async move {
372-
if let Err(e) = axum::serve(health_listener, health_router().into_make_service()).await
373-
{
378+
if let Err(e) = axum::serve(health_listener, router.into_make_service()).await {
374379
error!("Health server error: {e}");
375380
}
376381
});

0 commit comments

Comments
 (0)