Skip to content

Commit f7c8262

Browse files
authored
feat: add atlas-server observability metrics (#47)
* feat: add atlas-server observability metrics * fix: address observability review feedback * fix: satisfy backend clippy * feat: extend observability with lag, missing blocks, and processing duration metrics - Track indexer lag (chain head vs indexed head) as a gauge - Track known missing blocks from failed_blocks table - Add block processing duration histogram (excluding idle sleep) - Add head block timestamp gauge - Make install_prometheus_recorder idempotent via OnceLock - Add custom histogram buckets for processing duration * fix: drive head-block metrics from batch watermark, not end_block - Capture batch.last_block before write_batch consumes the batch and use it for set_indexer_head_block and indexed_head; if the tail block landed in failed_blocks the persisted watermark is lower than end_block, so using end_block would under-report lag and misalign the timestamp gauge - Strengthen install_prometheus_recorder_is_idempotent: emit a metric inside the test so render() is non-empty regardless of execution order, then assert both handles see the same output
1 parent 7fd9b9b commit f7c8262

30 files changed

Lines changed: 1258 additions & 257 deletions

backend/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ anyhow = "1.0"
4141
tracing = "0.1"
4242
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
4343

44+
# Metrics
45+
metrics = "0.24"
46+
metrics-exporter-prometheus = "0.16"
47+
4448
# Config
4549
dotenvy = "0.15"
4650

@@ -55,7 +59,7 @@ hex = "0.4"
5559
chrono = { version = "0.4", features = ["serde"] }
5660

5761
# Testing
58-
testcontainers = "0.27"
62+
testcontainers = { version = "0.27", features = ["blocking"] }
5963
testcontainers-modules = { version = "0.15", features = ["postgres"] }
6064

6165
# CLI

backend/crates/atlas-server/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ thiserror = { workspace = true }
2323
anyhow = { workspace = true }
2424
tracing = { workspace = true }
2525
tracing-subscriber = { workspace = true }
26+
metrics = { workspace = true }
27+
metrics-exporter-prometheus = { workspace = true }
2628
dotenvy = { workspace = true }
2729
bigdecimal = { workspace = true }
2830
hex = { workspace = true }

backend/crates/atlas-server/src/api/error.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,34 @@ impl Deref for ApiError {
4444
}
4545
}
4646

47+
fn error_type(err: &AtlasError) -> Option<&'static str> {
48+
match err {
49+
AtlasError::Database(_) => Some("database"),
50+
AtlasError::Internal(_) => Some("internal"),
51+
AtlasError::Config(_) => Some("config"),
52+
AtlasError::Rpc(_) => Some("rpc_request"),
53+
AtlasError::MetadataFetch(_) => Some("metadata_fetch"),
54+
_ => None,
55+
}
56+
}
57+
4758
impl IntoResponse for ApiError {
4859
fn into_response(self) -> Response {
4960
use atlas_common::AtlasError;
5061

5162
let status =
5263
StatusCode::from_u16(self.0.status_code()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
5364

65+
// Increment error counter for Prometheus alerting
66+
if let Some(error_type) = error_type(&self.0) {
67+
metrics::counter!(
68+
"atlas_errors_total",
69+
"component" => "api",
70+
"error_type" => error_type
71+
)
72+
.increment(1);
73+
}
74+
5475
// Determine the client-facing message based on error type.
5576
// Internal details are logged server-side to avoid leaking stack traces or
5677
// database internals to callers.
@@ -119,6 +140,31 @@ mod tests {
119140
use super::*;
120141
use axum::body::to_bytes;
121142

143+
#[test]
144+
fn error_type_maps_expected_variants() {
145+
assert_eq!(
146+
error_type(&AtlasError::Database(sqlx::Error::RowNotFound)),
147+
Some("database")
148+
);
149+
assert_eq!(
150+
error_type(&AtlasError::Internal("x".to_string())),
151+
Some("internal")
152+
);
153+
assert_eq!(
154+
error_type(&AtlasError::Config("x".to_string())),
155+
Some("config")
156+
);
157+
assert_eq!(
158+
error_type(&AtlasError::Rpc("x".to_string())),
159+
Some("rpc_request")
160+
);
161+
assert_eq!(
162+
error_type(&AtlasError::MetadataFetch("x".to_string())),
163+
Some("metadata_fetch")
164+
);
165+
assert_eq!(error_type(&AtlasError::NotFound("x".to_string())), None);
166+
}
167+
122168
#[tokio::test]
123169
async fn too_many_requests_sets_retry_after_header_and_body() {
124170
let response = ApiError(AtlasError::TooManyRequests {

backend/crates/atlas-server/src/api/handlers/faucet.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ mod tests {
162162
let head_tracker = Arc::new(crate::head::HeadTracker::empty(10));
163163
let (tx, _) = broadcast::channel(1);
164164
let (da_tx, _) = broadcast::channel(1);
165+
let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
166+
.build_recorder()
167+
.handle();
165168
Arc::new(AppState {
166169
pool,
167170
block_events_tx: tx,
@@ -180,6 +183,8 @@ mod tests {
180183
background_color_light: None,
181184
success_color: None,
182185
error_color: None,
186+
metrics: crate::metrics::Metrics::new(),
187+
prometheus_handle,
183188
})
184189
}
185190

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
2+
use chrono::{DateTime, Utc};
3+
use serde::Serialize;
4+
use std::sync::Arc;
5+
6+
use crate::api::AppState;
7+
8+
const MAX_INDEXER_AGE_MINUTES: i64 = 5;
9+
10+
#[derive(Serialize)]
11+
struct HealthResponse {
12+
status: &'static str,
13+
#[serde(skip_serializing_if = "Option::is_none")]
14+
reason: Option<String>,
15+
}
16+
17+
fn readiness_status(
18+
latest_indexed_at: Option<DateTime<Utc>>,
19+
now: DateTime<Utc>,
20+
) -> (StatusCode, HealthResponse) {
21+
let Some(indexed_at) = latest_indexed_at else {
22+
return (
23+
StatusCode::SERVICE_UNAVAILABLE,
24+
HealthResponse {
25+
status: "not_ready",
26+
reason: Some("indexer state unavailable".to_string()),
27+
},
28+
);
29+
};
30+
31+
let age = now - indexed_at;
32+
if age > chrono::Duration::minutes(MAX_INDEXER_AGE_MINUTES) {
33+
return (
34+
StatusCode::SERVICE_UNAVAILABLE,
35+
HealthResponse {
36+
status: "not_ready",
37+
reason: Some(format!(
38+
"indexer stale: last block indexed {}s ago",
39+
age.num_seconds()
40+
)),
41+
},
42+
);
43+
}
44+
45+
(
46+
StatusCode::OK,
47+
HealthResponse {
48+
status: "ready",
49+
reason: None,
50+
},
51+
)
52+
}
53+
54+
/// GET /health/live — liveness probe (process is alive)
55+
pub async fn liveness() -> impl IntoResponse {
56+
Json(HealthResponse {
57+
status: "ok",
58+
reason: None,
59+
})
60+
}
61+
62+
/// GET /health/ready — readiness probe (DB reachable, indexer fresh)
63+
pub async fn readiness(State(state): State<Arc<AppState>>) -> impl IntoResponse {
64+
// Check DB connectivity
65+
if let Err(e) = sqlx::query("SELECT 1").execute(&state.pool).await {
66+
tracing::warn!(error = %e, "readiness database check failed");
67+
return (
68+
StatusCode::SERVICE_UNAVAILABLE,
69+
Json(HealthResponse {
70+
status: "not_ready",
71+
reason: Some("database unreachable".to_string()),
72+
}),
73+
);
74+
}
75+
76+
let latest = match super::status::latest_indexed_block(state.as_ref()).await {
77+
Ok(latest) => latest,
78+
Err(e) => {
79+
tracing::warn!(error = %e, "readiness indexer state check failed");
80+
return (
81+
StatusCode::SERVICE_UNAVAILABLE,
82+
Json(HealthResponse {
83+
status: "not_ready",
84+
reason: Some("indexer state unavailable".to_string()),
85+
}),
86+
);
87+
}
88+
};
89+
90+
let (status, body) = readiness_status(latest.map(|(_, indexed_at)| indexed_at), Utc::now());
91+
(status, Json(body))
92+
}
93+
94+
#[cfg(test)]
95+
mod tests {
96+
use super::*;
97+
use crate::head::HeadTracker;
98+
use crate::metrics::Metrics;
99+
use axum::body::to_bytes;
100+
use sqlx::postgres::PgPoolOptions;
101+
use std::sync::Arc;
102+
use tokio::sync::broadcast;
103+
104+
fn app_state(pool: sqlx::PgPool, head_tracker: Arc<HeadTracker>) -> Arc<AppState> {
105+
let (block_tx, _) = broadcast::channel(1);
106+
let (da_tx, _) = broadcast::channel(1);
107+
let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
108+
.build_recorder()
109+
.handle();
110+
111+
Arc::new(AppState {
112+
pool,
113+
block_events_tx: block_tx,
114+
da_events_tx: da_tx,
115+
head_tracker,
116+
rpc_url: String::new(),
117+
da_tracking_enabled: false,
118+
faucet: None,
119+
chain_id: 1,
120+
chain_name: "Test Chain".to_string(),
121+
chain_logo_url: None,
122+
chain_logo_url_light: None,
123+
chain_logo_url_dark: None,
124+
accent_color: None,
125+
background_color_dark: None,
126+
background_color_light: None,
127+
success_color: None,
128+
error_color: None,
129+
metrics: Metrics::new(),
130+
prometheus_handle,
131+
})
132+
}
133+
134+
async fn json_response(response: axum::response::Response) -> (StatusCode, serde_json::Value) {
135+
let status = response.status();
136+
let body = to_bytes(response.into_body(), usize::MAX)
137+
.await
138+
.expect("read response body");
139+
let json = serde_json::from_slice(&body).expect("parse json response");
140+
(status, json)
141+
}
142+
143+
#[tokio::test]
144+
async fn liveness_returns_ok() {
145+
let (status, json) = json_response(liveness().await.into_response()).await;
146+
147+
assert_eq!(status, StatusCode::OK);
148+
assert_eq!(json["status"], "ok");
149+
assert!(json.get("reason").is_none());
150+
}
151+
152+
#[tokio::test]
153+
async fn readiness_returns_unavailable_when_database_is_down() {
154+
let pool = PgPoolOptions::new()
155+
.connect_lazy("postgres://postgres:postgres@127.0.0.1:1/atlas")
156+
.expect("create lazy pool");
157+
let state = app_state(pool, Arc::new(HeadTracker::empty(10)));
158+
159+
let (status, json) = json_response(readiness(State(state)).await.into_response()).await;
160+
161+
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
162+
assert_eq!(json["status"], "not_ready");
163+
assert_eq!(json["reason"], "database unreachable");
164+
}
165+
166+
#[test]
167+
fn readiness_returns_unavailable_when_indexer_state_is_missing() {
168+
let (status, body) = readiness_status(None, Utc::now());
169+
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
170+
assert_eq!(body.status, "not_ready");
171+
assert_eq!(body.reason.as_deref(), Some("indexer state unavailable"));
172+
}
173+
174+
#[test]
175+
fn readiness_returns_unavailable_for_stale_indexer_state() {
176+
let (status, body) = readiness_status(
177+
Some(Utc::now() - chrono::Duration::minutes(MAX_INDEXER_AGE_MINUTES + 1)),
178+
Utc::now(),
179+
);
180+
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
181+
assert_eq!(body.status, "not_ready");
182+
assert!(body
183+
.reason
184+
.as_deref()
185+
.expect("reason string")
186+
.contains("indexer stale"));
187+
}
188+
189+
#[test]
190+
fn readiness_returns_ready_for_fresh_indexer_state() {
191+
let (status, body) = readiness_status(Some(Utc::now()), Utc::now());
192+
assert_eq!(status, StatusCode::OK);
193+
assert_eq!(body.status, "ready");
194+
assert!(body.reason.is_none());
195+
}
196+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use axum::extract::State;
2+
use std::sync::Arc;
3+
4+
use crate::api::AppState;
5+
6+
/// GET /metrics — Prometheus text format
7+
pub async fn metrics(State(state): State<Arc<AppState>>) -> String {
8+
state.prometheus_handle.render()
9+
}
10+
11+
#[cfg(test)]
12+
mod tests {
13+
use super::*;
14+
use crate::head::HeadTracker;
15+
use crate::metrics::Metrics;
16+
use sqlx::postgres::PgPoolOptions;
17+
use std::sync::OnceLock;
18+
use tokio::sync::broadcast;
19+
20+
fn test_prometheus_handle() -> metrics_exporter_prometheus::PrometheusHandle {
21+
static PROMETHEUS_HANDLE: OnceLock<metrics_exporter_prometheus::PrometheusHandle> =
22+
OnceLock::new();
23+
24+
PROMETHEUS_HANDLE
25+
.get_or_init(crate::metrics::install_prometheus_recorder)
26+
.clone()
27+
}
28+
29+
#[tokio::test]
30+
async fn metrics_handler_renders_prometheus_output() {
31+
let pool = PgPoolOptions::new()
32+
.connect_lazy("postgres://test@localhost:5432/test")
33+
.expect("lazy pool");
34+
let (block_tx, _) = broadcast::channel(1);
35+
let (da_tx, _) = broadcast::channel(1);
36+
let prometheus_handle = test_prometheus_handle();
37+
let recorder_metrics = Metrics::new();
38+
recorder_metrics.set_indexer_head_block(42);
39+
let state = Arc::new(AppState {
40+
pool,
41+
block_events_tx: block_tx,
42+
da_events_tx: da_tx,
43+
head_tracker: Arc::new(HeadTracker::empty(10)),
44+
rpc_url: String::new(),
45+
da_tracking_enabled: false,
46+
faucet: None,
47+
chain_id: 1,
48+
chain_name: "Test Chain".to_string(),
49+
chain_logo_url: None,
50+
chain_logo_url_light: None,
51+
chain_logo_url_dark: None,
52+
accent_color: None,
53+
background_color_dark: None,
54+
background_color_light: None,
55+
success_color: None,
56+
error_color: None,
57+
metrics: recorder_metrics,
58+
prometheus_handle,
59+
});
60+
61+
let body = super::metrics(State(state)).await;
62+
63+
assert!(body.contains("atlas_indexer_head_block"));
64+
}
65+
}

backend/crates/atlas-server/src/api/handlers/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ pub mod blocks;
33
pub mod config;
44
pub mod etherscan;
55
pub mod faucet;
6+
pub mod health;
67
pub mod logs;
8+
pub mod metrics;
79
pub mod nfts;
810
pub mod proxy;
911
pub mod search;

0 commit comments

Comments
 (0)