Skip to content

Commit f5fdaae

Browse files
prestwichclaude
andcommitted
feat(cold): hoist write SLO and stream-setup timeout into the handle
Adds two accessors to the `ColdStorageBackend` trait: fn read_timeout(&self) -> Option<Duration> { None } fn write_timeout(&self) -> Option<Duration> { None } Wired through `MdbxColdBackend`, `SqlColdBackend`, and `EitherCold`. `MemColdBackend` returns `None` (already-documented test exemption). Two behaviour changes use these: 1. The advisory write-SLO WARN moves from the MDBX backend (`warn_on_overrun` per-method) to `ColdStorage::spawn_write`. Timing is now captured before `write_sem` acquisition, so the elapsed value covers the queue wait, the read drain, and the commit end-to-end. The failure shape that wedged production at #56 — slow readers gating writes — now surfaces as a write-SLO violation rather than as a sub-threshold backend timing. 2. `stream_logs`'s setup `get_latest_block` is wrapped in `tokio::time::timeout(backend.read_timeout(), ...)`. Without this, a stuck point lookup (cold MDBX page) or a saturated PG pool parking on `acquire_timeout` could pin N concurrent setup callers indefinitely with no permit cap. The setup read still bypasses `read_sem` and the drain barrier by design. Also drops the now-unused `tracing` dep from `signet-cold-mdbx` and updates the type docs to point at the handle's new WARN path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d3e3210 commit f5fdaae

6 files changed

Lines changed: 194 additions & 68 deletions

File tree

crates/cold-mdbx/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ signet-libmdbx.workspace = true
2424
signet-storage-types.workspace = true
2525
thiserror.workspace = true
2626
tokio.workspace = true
27-
tracing.workspace = true
2827

2928
[dev-dependencies]
3029
signet-hot-mdbx = { workspace = true, features = ["test-utils"] }

crates/cold-mdbx/src/backend.rs

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ fn produce_log_stream_blocking(
125125
for block_num in from..=to {
126126
// Check the deadline before starting each block so we
127127
// don't begin reading after the caller's timeout.
128-
if std::time::Instant::now() > deadline {
128+
if Instant::now() > deadline {
129129
let _ = sender.blocking_send(Err(ColdStorageError::StreamDeadlineExceeded));
130130
return;
131131
}
@@ -149,7 +149,7 @@ fn produce_log_stream_blocking(
149149
for result in iter {
150150
// Per-receipt deadline check bounds iteration cost across
151151
// blocks with many receipts.
152-
if std::time::Instant::now() > deadline {
152+
if Instant::now() > deadline {
153153
let _ = sender.blocking_send(Err(ColdStorageError::StreamDeadlineExceeded));
154154
return;
155155
}
@@ -165,7 +165,7 @@ fn produce_log_stream_blocking(
165165
// so without this check a single block with thousands
166166
// of matching logs can run arbitrarily past the
167167
// deadline.
168-
if std::time::Instant::now() > deadline {
168+
if Instant::now() > deadline {
169169
let _ = sender.blocking_send(Err(ColdStorageError::StreamDeadlineExceeded));
170170
return;
171171
}
@@ -226,10 +226,11 @@ fn produce_log_stream_blocking(
226226
/// `tokio::time::timeout`. Callers that need fail-fast behavior on
227227
/// stuck I/O should apply their own timeout at the call site.
228228
/// - **Writes** (`append_block`, `append_blocks`, `truncate_above`,
229-
/// `drain_above`) record elapsed time against `write_timeout` and
230-
/// emit a [`tracing::warn!`] on overrun, but the commit is
231-
/// uninterruptible: `write_timeout` is an SLO/alerting signal only,
232-
/// not a hard abort.
229+
/// `drain_above`) are uninterruptible MDBX commits. The handle
230+
/// measures end-to-end latency (including `write_sem` wait and the
231+
/// read drain) against `write_timeout` and emits a `tracing::warn!`
232+
/// on overrun via the `ColdStorageBackend::write_timeout` accessor;
233+
/// `write_timeout` is an SLO/alerting signal only, not a hard abort.
233234
#[derive(Clone)]
234235
pub struct MdbxColdBackend {
235236
/// The MDBX environment.
@@ -239,7 +240,7 @@ pub struct MdbxColdBackend {
239240
/// lookups do NOT consult this deadline — see the type-level docs.
240241
read_timeout: Duration,
241242
/// Advisory deadline for write operations. Writes that exceed this are
242-
/// logged via [`tracing::warn!`] but still report success.
243+
/// logged via `tracing::warn!` but still report success.
243244
write_timeout: Duration,
244245
}
245246

@@ -262,17 +263,30 @@ impl MdbxColdBackend {
262263
/// lookups (`get_header`, `get_transaction`, etc.) do NOT consult
263264
/// this deadline — see the type-level docs on [`MdbxColdBackend`]
264265
/// for the exemption rationale and its operational implications.
266+
///
267+
/// # Panics
268+
///
269+
/// Panics if `read_timeout` is zero — a zero deadline is a
270+
/// configuration mistake, not a "disable" signal, and the trait
271+
/// contract requires a real bound.
265272
#[must_use]
266-
pub const fn with_read_timeout(mut self, read_timeout: Duration) -> Self {
273+
pub fn with_read_timeout(mut self, read_timeout: Duration) -> Self {
274+
assert!(!read_timeout.is_zero(), "read_timeout must be non-zero");
267275
self.read_timeout = read_timeout;
268276
self
269277
}
270278

271279
/// Set the advisory write deadline. Writes exceeding this threshold
272-
/// emit a [`tracing::warn!`] but still report success to the caller;
280+
/// emit a `tracing::warn!` but still report success to the caller;
273281
/// MDBX commits are uninterruptible.
282+
///
283+
/// # Panics
284+
///
285+
/// Panics if `write_timeout` is zero. See
286+
/// [`with_read_timeout`](Self::with_read_timeout).
274287
#[must_use]
275-
pub const fn with_write_timeout(mut self, write_timeout: Duration) -> Self {
288+
pub fn with_write_timeout(mut self, write_timeout: Duration) -> Self {
289+
assert!(!write_timeout.is_zero(), "write_timeout must be non-zero");
276290
self.write_timeout = write_timeout;
277291
self
278292
}
@@ -726,6 +740,13 @@ impl MdbxColdBackend {
726740
if !filter.matches(&log) {
727741
continue;
728742
}
743+
// Per-log deadline check: a single receipt with
744+
// thousands of matching logs would otherwise run
745+
// unchecked past the deadline. Mirrors the
746+
// streaming path in `produce_log_stream_blocking`.
747+
if Instant::now() > deadline {
748+
return Err(MdbxColdError::Timeout(read_timeout));
749+
}
729750
if results.len() >= max_logs {
730751
return Err(MdbxColdError::TooManyLogs(max_logs));
731752
}
@@ -928,67 +949,47 @@ impl ColdStorageRead for MdbxColdBackend {
928949
}
929950
}
930951

931-
/// Log an advisory warning if a successful write exceeded the threshold.
932-
///
933-
/// Only logs on success: a failed write that overran the threshold already
934-
/// surfaces a `Backend` error to the caller, and a noisy overrun WARN would
935-
/// poison SLO alerting built on this signal.
936-
fn warn_on_overrun(op: &'static str, elapsed: Duration, threshold: Duration, is_ok: bool) {
937-
if is_ok && elapsed > threshold {
938-
tracing::warn!(
939-
op,
940-
elapsed_ms = elapsed.as_millis() as u64,
941-
threshold_ms = threshold.as_millis() as u64,
942-
"mdbx write exceeded advisory write timeout",
943-
);
944-
}
945-
}
946-
947952
impl ColdStorageWrite for MdbxColdBackend {
948953
async fn append_block(&self, data: BlockData) -> ColdResult<()> {
949-
let threshold = self.write_timeout;
950954
let this = self.clone();
951-
let start = Instant::now();
952-
let result = tokio::task::spawn_blocking(move || this.append_block_inner(data))
955+
tokio::task::spawn_blocking(move || this.append_block_inner(data))
953956
.await
954-
.map_err(|_| ColdStorageError::TaskTerminated)?;
955-
warn_on_overrun("append_block", start.elapsed(), threshold, result.is_ok());
956-
Ok(result?)
957+
.map_err(|_| ColdStorageError::TaskTerminated)?
958+
.map_err(ColdStorageError::from)
957959
}
958960

959961
async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
960-
let threshold = self.write_timeout;
961962
let this = self.clone();
962-
let start = Instant::now();
963-
let result = tokio::task::spawn_blocking(move || this.append_blocks_inner(data))
963+
tokio::task::spawn_blocking(move || this.append_blocks_inner(data))
964964
.await
965-
.map_err(|_| ColdStorageError::TaskTerminated)?;
966-
warn_on_overrun("append_blocks", start.elapsed(), threshold, result.is_ok());
967-
Ok(result?)
965+
.map_err(|_| ColdStorageError::TaskTerminated)?
966+
.map_err(ColdStorageError::from)
968967
}
969968

970969
async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
971-
let threshold = self.write_timeout;
972970
let this = self.clone();
973-
let start = Instant::now();
974-
let result = tokio::task::spawn_blocking(move || this.truncate_above_inner(block))
971+
tokio::task::spawn_blocking(move || this.truncate_above_inner(block))
975972
.await
976-
.map_err(|_| ColdStorageError::TaskTerminated)?;
977-
warn_on_overrun("truncate_above", start.elapsed(), threshold, result.is_ok());
978-
Ok(result?)
973+
.map_err(|_| ColdStorageError::TaskTerminated)?
974+
.map_err(ColdStorageError::from)
979975
}
980976
}
981977

982978
impl ColdStorageBackend for MdbxColdBackend {
979+
fn read_timeout(&self) -> Option<Duration> {
980+
Some(self.read_timeout)
981+
}
982+
983+
fn write_timeout(&self) -> Option<Duration> {
984+
Some(self.write_timeout)
985+
}
986+
983987
async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
984-
let threshold = self.write_timeout;
985988
let this = self.clone();
986-
let start = Instant::now();
987-
let result = tokio::task::spawn_blocking(move || this.drain_above_inner(block))
989+
tokio::task::spawn_blocking(move || this.drain_above_inner(block))
988990
.await
989-
.map_err(|_| ColdStorageError::TaskTerminated)?;
990-
warn_on_overrun("drain_above", start.elapsed(), threshold, result.is_ok());
991-
Ok(result?)
991+
.map_err(|_| ColdStorageError::TaskTerminated)?
992+
.map_err(ColdStorageError::from)
992993
}
993994
}
994995

crates/cold-sql/src/backend.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,15 @@ impl SqlColdBackend {
150150
/// On Postgres this sets `statement_timeout` on every transaction
151151
/// opened by a read method. On SQLite the value is stored but
152152
/// not enforced — SQLite has no equivalent mechanism.
153+
///
154+
/// # Panics
155+
///
156+
/// Panics if `d` rounds to 0 ms. Postgres interprets
157+
/// `statement_timeout = 0` as "no timeout", which would silently
158+
/// disable the trait-level mandatory-timeout contract.
153159
#[must_use]
154-
pub const fn with_read_timeout(mut self, d: Duration) -> Self {
160+
pub fn with_read_timeout(mut self, d: Duration) -> Self {
161+
assert!(d.as_millis() >= 1, "read_timeout must be >= 1ms (got {d:?})");
155162
self.read_timeout = d;
156163
self
157164
}
@@ -161,8 +168,13 @@ impl SqlColdBackend {
161168
/// On Postgres this sets `statement_timeout` on every transaction
162169
/// opened by a write method. On SQLite the value is stored but
163170
/// not enforced — SQLite has no equivalent mechanism.
171+
///
172+
/// # Panics
173+
///
174+
/// Panics if `d` rounds to 0 ms. See [`with_read_timeout`](Self::with_read_timeout).
164175
#[must_use]
165-
pub const fn with_write_timeout(mut self, d: Duration) -> Self {
176+
pub fn with_write_timeout(mut self, d: Duration) -> Self {
177+
assert!(d.as_millis() >= 1, "write_timeout must be >= 1ms (got {d:?})");
166178
self.write_timeout = d;
167179
self
168180
}
@@ -1571,6 +1583,14 @@ impl ColdStorageWrite for SqlColdBackend {
15711583
}
15721584

15731585
impl ColdStorageBackend for SqlColdBackend {
1586+
fn read_timeout(&self) -> Option<Duration> {
1587+
Some(self.read_timeout)
1588+
}
1589+
1590+
fn write_timeout(&self) -> Option<Duration> {
1591+
Some(self.write_timeout)
1592+
}
1593+
15741594
async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
15751595
let bn = to_i64(block);
15761596
let mut tx = self.begin_write().await.map_err(ColdStorageError::from)?;

crates/cold/src/handle.rs

Lines changed: 77 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,46 @@ use tracing::Instrument;
3939
/// Default maximum deadline for streaming operations.
4040
const DEFAULT_MAX_STREAM_DEADLINE: Duration = Duration::from_secs(60);
4141

42+
/// Default fallback for the stream-setup `get_latest_block` deadline
43+
/// when the backend does not advertise a [`read_timeout`]. Picked to
44+
/// match the SQL/MDBX defaults so behaviour is predictable.
45+
///
46+
/// [`read_timeout`]: crate::ColdStorageBackend::read_timeout
47+
const DEFAULT_STREAM_SETUP_TIMEOUT: Duration = Duration::from_millis(500);
48+
49+
/// Emit an advisory WARN if a successful write exceeded its end-to-end
50+
/// SLO target. Only fires on `Ok`: a failed write already surfaces an
51+
/// error to the caller, and a noisy overrun WARN on top would poison
52+
/// alerting built on this signal.
53+
fn warn_on_write_overrun(
54+
op: &'static str,
55+
elapsed: Duration,
56+
threshold: Option<Duration>,
57+
is_ok: bool,
58+
) {
59+
let Some(threshold) = threshold else { return };
60+
if is_ok && elapsed > threshold {
61+
tracing::warn!(
62+
op,
63+
elapsed_ms = elapsed.as_millis() as u64,
64+
threshold_ms = threshold.as_millis() as u64,
65+
"cold write exceeded end-to-end write timeout (queue + drain + commit)",
66+
);
67+
}
68+
}
69+
70+
/// Log a `JoinError` from a tracked spawn before mapping to
71+
/// [`ColdStorageError::TaskTerminated`]. A panic inside the spawned body
72+
/// is otherwise indistinguishable from graceful shutdown for the
73+
/// caller, which is a poor on-call signal.
74+
fn log_join_error(op: &'static str, e: &tokio::task::JoinError) {
75+
if e.is_panic() {
76+
tracing::error!(op, error = %e, "cold storage spawned task panicked");
77+
} else if e.is_cancelled() {
78+
tracing::debug!(op, "cold storage spawned task cancelled");
79+
}
80+
}
81+
4282
/// Maximum concurrent read operations.
4383
const MAX_CONCURRENT_READERS: usize = 64;
4484

@@ -170,7 +210,10 @@ impl<B: ColdStorageBackend> ColdStorage<B> {
170210
.in_current_span(),
171211
)
172212
.await
173-
.map_err(|_| ColdStorageError::TaskTerminated)?
213+
.map_err(|e| {
214+
log_join_error(op, &e);
215+
ColdStorageError::TaskTerminated
216+
})?
174217
}
175218

176219
/// Spawn a write task under the `write_sem` permit, holding a full drain
@@ -189,15 +232,22 @@ impl<B: ColdStorageBackend> ColdStorage<B> {
189232
F: FnOnce(Arc<Inner<B>>) -> Fut + Send + 'static,
190233
Fut: std::future::Future<Output = ColdResult<T>> + Send,
191234
{
192-
let wait = Instant::now();
235+
// End-to-end SLO start: capture before permit acquisition so the
236+
// measurement covers `write_sem` queueing and the read drain in
237+
// addition to the backend commit. This is the failure shape the
238+
// PR targets — a slow drain followed by a fast commit must surface
239+
// as an SLO violation, not as a sub-threshold backend timing.
240+
let e2e_start = Instant::now();
241+
let threshold = self.inner.backend.write_timeout();
242+
193243
let write_permit = self
194244
.inner
195245
.write_sem
196246
.clone()
197247
.acquire_owned()
198248
.await
199249
.map_err(|_| ColdStorageError::TaskTerminated)?;
200-
metrics::record_permit_wait("write", wait.elapsed());
250+
metrics::record_permit_wait("write", e2e_start.elapsed());
201251

202252
let drain_wait = Instant::now();
203253
let drain = self
@@ -223,12 +273,16 @@ impl<B: ColdStorageBackend> ColdStorage<B> {
223273
if let Err(ref e) = result {
224274
metrics::record_op_error(op, e.kind());
225275
}
276+
warn_on_write_overrun(op, e2e_start.elapsed(), threshold, result.is_ok());
226277
result
227278
}
228279
.in_current_span(),
229280
)
230281
.await
231-
.map_err(|_| ColdStorageError::TaskTerminated)?
282+
.map_err(|e| {
283+
log_join_error(op, &e);
284+
ColdStorageError::TaskTerminated
285+
})?
232286
}
233287

234288
// ==========================================================================
@@ -518,15 +572,28 @@ impl<B: ColdStorageBackend> ColdStorage<B> {
518572
// bypass `read_sem` and the drain barrier: a stream asking for
519573
// "latest" should observe latest at setup time even alongside an
520574
// in-flight write.
575+
//
576+
// Wrap the setup read in a wall-clock timeout so a stuck backend
577+
// (cold MDBX page, saturated PG pool) cannot stall N concurrent
578+
// setup callers indefinitely. The future drops on timeout but the
579+
// backend work continues — same trade-off the rest of the design
580+
// accepts.
521581
let to = match filter.get_to_block() {
522582
Some(to) => to,
523-
None => match self.inner.backend.get_latest_block().await? {
524-
Some(latest) => latest,
525-
None => {
526-
let (_tx, rx) = mpsc::channel(1);
527-
return Ok(ReceiverStream::new(rx));
583+
None => {
584+
let setup_to =
585+
self.inner.backend.read_timeout().unwrap_or(DEFAULT_STREAM_SETUP_TIMEOUT);
586+
let latest = tokio::time::timeout(setup_to, self.inner.backend.get_latest_block())
587+
.await
588+
.map_err(|_| ColdStorageError::DeadlineExceeded(setup_to))??;
589+
match latest {
590+
Some(latest) => latest,
591+
None => {
592+
let (_tx, rx) = mpsc::channel(1);
593+
return Ok(ReceiverStream::new(rx));
594+
}
528595
}
529-
},
596+
}
530597
};
531598

532599
let wait = Instant::now();

0 commit comments

Comments
 (0)