From c6c68628818554c2179a2f0dcd59ce1cd6a99fc4 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 6 May 2026 17:39:42 -0700 Subject: [PATCH 1/6] feat(streams): add consumer watermark commit latency metric Optional message_time on watermarks captures the last data message time at the pipeline head and the oldest row time for batch synthetic watermarks. WatermarkCommitOffsets records histogram streams.pipeline.consumer.watermark_commit_latency on commit (seconds since message_time, or 0 when unset). Co-authored-by: Cursor --- .../adapters/arroyo/rust_arroyo.py | 8 +- .../sentry_streams/rust_streams.pyi | 3 + sentry_streams/src/batch_step.rs | 60 ++++++++++- sentry_streams/src/commit_policy.rs | 17 +++- sentry_streams/src/dev_null_sink.rs | 6 +- sentry_streams/src/fake_strategy.rs | 2 + sentry_streams/src/filter_step.rs | 2 +- sentry_streams/src/header_filter_step.rs | 2 +- sentry_streams/src/messages.rs | 99 ++++++++++++++++--- sentry_streams/src/python_operator.rs | 2 +- sentry_streams/src/sinks.rs | 6 +- sentry_streams/src/testutils.rs | 13 ++- sentry_streams/src/transformer.rs | 2 +- sentry_streams/src/watermark.rs | 13 ++- .../arroyo/helpers/message_helpers.py | 5 +- 15 files changed, 211 insertions(+), 29 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 0d0b8647..689612bd 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -81,7 +81,13 @@ def build_py_metrics_config(cfg: MetricsConfig) -> PyMetricConfig | None: - """Build Rust-side DogStatsD config from the same metrics dict used by configure_metrics.""" + """Build Rust-side DogStatsD config from the same metrics dict used by configure_metrics. + + When this is installed, the consumer also emits + ``streams.pipeline.consumer.watermark_commit_latency`` (histogram, seconds): wall time from + optional watermark ``message_time`` (epoch seconds of the last or oldest batched data row) to + the commit decision; 0 is recorded when ``message_time`` is absent. + """ if cfg["type"] != "datadog": return None return PyMetricConfig( diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 84257e98..4fc1f7a3 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -200,8 +200,11 @@ class PyWatermark: self, payload: dict[tuple[str, int], int], timestamp: int, + message_time: int | None = None, ) -> None: ... @property def committable(self) -> dict[tuple[str, int], int]: ... @property def timestamp(self) -> int: ... + @property + def message_time(self) -> int | None: ... diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index c2a4d3a8..038c1c97 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -5,7 +5,10 @@ //! element. Watermark handling and backpressure are unchanged. //! //! The GIL is taken only to build the list on flush (after [`Message::into_payload`] in submit). -use crate::messages::{into_pyany, PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; +use crate::messages::{ + into_pyany, streaming_payload_timestamp_floor_secs, PyAnyMessage, PyStreamingMessage, + RoutedValuePayload, +}; use crate::pipeline_stats::get_stats; use crate::routes::{Route, RoutedValue}; use crate::time_helpers::current_epoch; @@ -126,6 +129,21 @@ impl Batch { self.batch_offsets.clone() } + /// Minimum of per-row logical timestamps (epoch seconds, floor), for synthetic watermarks. + pub fn oldest_message_time_secs(&self) -> Option { + if self.elements.is_empty() { + return None; + } + traced_with_gil!(|py| { + let mut min_t: Option = None; + for el in &self.elements { + let t = streaming_payload_timestamp_floor_secs(py, el); + min_t = Some(min_t.map_or(t, |m| m.min(t))); + } + min_t + }) + } + pub fn flush(&self) -> Result, StrategyError> { let route = self.route.clone(); let committable = self.batch_offsets.clone(); @@ -258,6 +276,7 @@ impl BatchStep { // We create a synthetic watermark to avoid waiting for the next batch to complete before // allowing the consumer to commit. let committable_for_synthetic = b.current_offsets_snapshot(); + let batch_message_time = b.oldest_message_time_secs(); let flush_start = Instant::now(); let batch_msg = b.flush()?; get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64()); @@ -266,7 +285,7 @@ impl BatchStep { self.outbound.push_back(batch_msg); self.pending_batch = true; - self.enqueue_watermark_tail(wm_after_batch, committable_for_synthetic); + self.enqueue_watermark_tail(wm_after_batch, committable_for_synthetic, batch_message_time); Ok(()) } @@ -274,6 +293,7 @@ impl BatchStep { &mut self, wm_after_batch: Vec>, committable: BTreeMap, + message_time: Option, ) { for m in wm_after_batch { self.outbound.push_back(m); @@ -282,7 +302,11 @@ impl BatchStep { let wmk = Message::new_any_message( RoutedValue { route: self.route.clone(), - payload: RoutedValuePayload::make_watermark_payload(committable.clone(), ts), + payload: RoutedValuePayload::make_watermark_payload( + committable.clone(), + ts, + message_time, + ), }, committable, ); @@ -584,7 +608,9 @@ mod tests { use super::super::{BatchStep, Message}; use crate::fake_strategy::FakeStrategy; - use crate::testutils::{build_raw_routed_value, build_routed_value}; + use crate::testutils::{ + build_raw_routed_value, build_routed_value, build_routed_value_with_timestamp, + }; use crate::utils::traced_with_gil; use chrono::Utc; use pyo3::prelude::*; @@ -727,7 +753,7 @@ mod tests { let (mut step, _out, wms) = batch_step_with_fake(route.clone(), None, None); let rv = crate::routes::RoutedValue { route, - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let m = Message::new_any_message(rv, BTreeMap::new()); step.submit(m) @@ -735,6 +761,30 @@ mod tests { assert_eq!(wms.lock().unwrap().len(), 1); } + #[test] + fn synthetic_watermark_uses_oldest_row_timestamp() { + let route = Route::new("s".into(), vec!["w".into()]); + let (mut step, _out, wms) = batch_step_with_fake(route, Some(2), None); + traced_with_gil!(|py| { + let p1 = 1i32.into_pyobject(py).unwrap().into_any().unbind(); + let p2 = 2i32.into_pyobject(py).unwrap().into_any().unbind(); + let m1 = Message::new_any_message( + build_routed_value_with_timestamp(py, p1, "s", vec!["w".into()], 5000.25), + BTreeMap::new(), + ); + let m2 = Message::new_any_message( + build_routed_value_with_timestamp(py, p2, "s", vec!["w".into()], 9000.9), + BTreeMap::new(), + ); + step.submit(m1).unwrap(); + step.submit(m2).unwrap(); + step.poll().unwrap(); + }); + let w = wms.lock().unwrap(); + assert_eq!(w.len(), 1); + assert_eq!(w[0].message_time, Some(5000)); + } + #[test] fn submit_rejects_while_batch_stalled_in_outbound() { let route = Route::new("s".into(), vec!["w".into()]); diff --git a/sentry_streams/src/commit_policy.rs b/sentry_streams/src/commit_policy.rs index 3ecca9f0..5363a414 100644 --- a/sentry_streams/src/commit_policy.rs +++ b/sentry_streams/src/commit_policy.rs @@ -8,6 +8,10 @@ use sentry_arroyo::types::{Message, Partition}; use crate::messages::{RoutedValuePayload, WatermarkMessage}; use crate::routes::RoutedValue; +use crate::time_helpers::current_epoch; + +/// Histogram: seconds from watermark `message_time` (or 0 if absent) to commit decision. +const METRIC_WATERMARK_COMMIT_LATENCY: &str = "streams.pipeline.consumer.watermark_commit_latency"; /// Records the committable of a received Watermark and records how many times that watermark has been seen. #[derive(Clone, Debug)] @@ -15,6 +19,7 @@ struct WatermarkTracker { num_watermarks: u64, committable: HashMap, time_added: Instant, + message_time: Option, } /// WatermarkCommitOffsets is a commit policy that only commits once it receives a copy of a Watermark @@ -59,6 +64,14 @@ impl WatermarkCommitOffsets { }; commit_request = merge_commit_request(Some(commit_request), Some(current_request)).unwrap(); + + let now = current_epoch(); + let secs = watermark + .message_time + .map(|t| now.saturating_sub(t) as f64) + .unwrap_or(0.0); + metrics::histogram!(METRIC_WATERMARK_COMMIT_LATENCY).record(secs); + to_remove.push(ts.clone()); // Clean up any hanging watermarks which still haven't gotten all their copies in 5 min // from when the first copy was seen @@ -94,7 +107,8 @@ impl ProcessingStrategy for WatermarkCommitOffsets { WatermarkTracker { num_watermarks: tracker.num_watermarks + 1, committable: tracker.committable.clone(), - time_added: tracker.time_added.clone(), + time_added: tracker.time_added, + message_time: tracker.message_time, }, ); } @@ -109,6 +123,7 @@ impl ProcessingStrategy for WatermarkCommitOffsets { num_watermarks: 1, committable: committable, time_added: Instant::now(), + message_time: watermark.message_time, }, ); } diff --git a/sentry_streams/src/dev_null_sink.rs b/sentry_streams/src/dev_null_sink.rs index 85cfcaca..851437e1 100644 --- a/sentry_streams/src/dev_null_sink.rs +++ b/sentry_streams/src/dev_null_sink.rs @@ -234,7 +234,11 @@ mod tests { let watermark_msg = sentry_arroyo::types::Message::new_any_message( crate::routes::RoutedValue { route: Route::new("source".to_string(), vec!["wp1".to_string()]), - payload: RoutedValuePayload::make_watermark_payload(watermark_payload.clone(), 0), + payload: RoutedValuePayload::make_watermark_payload( + watermark_payload.clone(), + 0, + None, + ), }, BTreeMap::new(), ); diff --git a/sentry_streams/src/fake_strategy.rs b/sentry_streams/src/fake_strategy.rs index 9ce28008..e66703ec 100644 --- a/sentry_streams/src/fake_strategy.rs +++ b/sentry_streams/src/fake_strategy.rs @@ -138,5 +138,7 @@ pub fn assert_watermarks_match(expected_messages: Vec, actual_message for (actual, expected) in actual_messages.iter().zip(expected_messages.iter()) { assert_eq!(actual.committable, expected.committable); + assert_eq!(actual.timestamp, expected.timestamp); + assert_eq!(actual.message_time, expected.message_time); } } diff --git a/sentry_streams/src/filter_step.rs b/sentry_streams/src/filter_step.rs index a0bf5598..0cfb15b9 100644 --- a/sentry_streams/src/filter_step.rs +++ b/sentry_streams/src/filter_step.rs @@ -294,7 +294,7 @@ mod tests { let watermark_val = RoutedValue { route: Route::new(String::from("source"), vec![]), - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new()); let watermark_res = strategy.submit(watermark_msg); diff --git a/sentry_streams/src/header_filter_step.rs b/sentry_streams/src/header_filter_step.rs index ebffcde7..25239118 100644 --- a/sentry_streams/src/header_filter_step.rs +++ b/sentry_streams/src/header_filter_step.rs @@ -390,7 +390,7 @@ mod tests { let watermark_val = RoutedValue { route: Route::new(String::from("source"), vec![]), - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new()); assert!(strategy.submit(watermark_msg).is_ok()); diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 4cf246b9..e1ef43af 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -39,9 +39,10 @@ use pyo3::types::{PyBytes, PyDict, PyInt, PyList, PyTuple}; use pyo3::Python; use pyo3::{prelude::*, types::PySequence, IntoPyObjectExt}; -use sentry_arroyo::types::Partition; +use sentry_arroyo::types::{Message, Partition}; use crate::committable::{convert_committable_to_py, convert_py_committable}; +use crate::routes::RoutedValue; use crate::utils::traced_with_gil; // Used from `mod tests` only; the library does not read headers from the Python constructor. @@ -119,9 +120,14 @@ impl Clone for WatermarkMessage { traced_with_gil!(|py| { let committable = py_watermark.committable.clone_ref(py); let timestamp = py_watermark.timestamp.clone_ref(py); + let message_time = py_watermark + .message_time + .as_ref() + .map(|m| m.clone_ref(py)); WatermarkMessage::PyWatermark(PyWatermark { committable, timestamp, + message_time, }) }) } @@ -134,6 +140,9 @@ impl Clone for WatermarkMessage { pub struct Watermark { pub committable: BTreeMap, pub timestamp: u64, + /// Unix epoch seconds (floor) from the newest data message since the previous watermark, + /// or the oldest message in a batch for synthetic batch watermarks. `None` if no such message. + pub message_time: Option, } impl Watermark { @@ -141,6 +150,19 @@ impl Watermark { Self { committable, timestamp, + message_time: None, + } + } + + pub fn with_message_time( + committable: BTreeMap, + timestamp: u64, + message_time: Option, + ) -> Self { + Self { + committable, + timestamp, + message_time, } } } @@ -154,15 +176,23 @@ pub struct PyWatermark { pub committable: Py, #[pyo3(get)] pub timestamp: Py, + #[pyo3(get)] + pub message_time: Option>, } #[pymethods] impl PyWatermark { #[new] - pub fn new(committable: Py, timestamp: Py) -> PyResult { + #[pyo3(signature = (committable, timestamp, message_time=None))] + pub fn new( + committable: Py, + timestamp: Py, + message_time: Option>, + ) -> PyResult { Ok(Self { committable, timestamp, + message_time, }) } } @@ -388,11 +418,37 @@ impl RoutedValuePayload { } } - pub fn make_watermark_payload(committable: BTreeMap, timestamp: u64) -> Self { - RoutedValuePayload::WatermarkMessage(WatermarkMessage::Watermark(Watermark::new( - committable, - timestamp, - ))) + pub fn make_watermark_payload( + committable: BTreeMap, + timestamp: u64, + message_time: Option, + ) -> Self { + RoutedValuePayload::WatermarkMessage(WatermarkMessage::Watermark( + Watermark::with_message_time(committable, timestamp, message_time), + )) + } +} + +/// Unix epoch seconds (floor) from a streaming payload (requires GIL). +pub fn streaming_payload_timestamp_floor_secs(py: Python<'_>, sm: &PyStreamingMessage) -> u64 { + match sm { + PyStreamingMessage::PyAnyMessage { content } => { + content.bind(py).borrow().timestamp as u64 + } + PyStreamingMessage::RawMessage { content } => { + content.bind(py).borrow().timestamp as u64 + } + } +} + +/// For a routed broker/data message, the logical message time used for watermark `message_time`. +pub fn data_message_time_secs_from_routed(message: &Message) -> Option { + let rv = message.payload(); + match &rv.payload { + RoutedValuePayload::WatermarkMessage(_) => None, + RoutedValuePayload::PyStreamingMessage(ref sm) => { + Some(traced_with_gil!(|py| streaming_payload_timestamp_floor_secs(py, sm))) + } } } @@ -416,6 +472,9 @@ impl From<&WatermarkMessage> for Py { WatermarkMessage::Watermark(watermark) => PyWatermark::new( convert_committable_to_py(py, watermark.committable.clone()).unwrap(), make_py_int(py, watermark.timestamp), + watermark + .message_time + .map(|t| make_py_int(py, t)), ) .unwrap() .into_py_any(py) @@ -423,6 +482,10 @@ impl From<&WatermarkMessage> for Py { WatermarkMessage::PyWatermark(watermark) => PyWatermark::new( watermark.committable.clone_ref(py), watermark.timestamp.clone_ref(py), + watermark + .message_time + .as_ref() + .map(|m| m.clone_ref(py)), ) .unwrap() .into_py_any(py) @@ -500,9 +563,17 @@ impl TryFrom> for WatermarkMessage { Err(e) => return Err(pyo3::exceptions::PyTypeError::new_err(e.to_string())), }; - Ok(WatermarkMessage::Watermark(Watermark::new( + let message_time = match &py_watermark.borrow().message_time { + None => None, + Some(py_int) => Some(py_int.bind(py).extract::().map_err(|e| { + pyo3::exceptions::PyTypeError::new_err(e.to_string()) + })?), + }; + + Ok(WatermarkMessage::Watermark(Watermark::with_message_time( committable, timestamp, + message_time, ))) } else { Err(pyo3::exceptions::PyTypeError::new_err(format!( @@ -708,8 +779,12 @@ mod tests { let _ = committable.set_item(key.unwrap(), 0); // Create PyAnyMessage - let msg = - PyWatermark::new(committable.unbind().clone_ref(py), make_py_int(py, 0)).unwrap(); + let msg = PyWatermark::new( + committable.unbind().clone_ref(py), + make_py_int(py, 0), + None, + ) + .unwrap(); // Check payload let payload_val: BTreeMap<(String, u64), u64> = @@ -723,7 +798,7 @@ mod tests { #[test] fn test_is_watermark_message() { - let wmsg = RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0); + let wmsg = RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None); assert!(wmsg.is_watermark_msg()); } @@ -756,7 +831,7 @@ mod tests { #[test] #[should_panic] fn test_unwrap_payload_watermark_msg() { - let wmsg = RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0); + let wmsg = RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None); wmsg.unwrap_payload(); } } diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index 1a62a620..d7d094f6 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -472,7 +472,7 @@ class RustOperatorDelegateFactory: let watermark_val = RoutedValue { route: Route::new(String::from("source"), vec![]), - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new()); let watermark_res = operator.submit(watermark_msg); diff --git a/sentry_streams/src/sinks.rs b/sentry_streams/src/sinks.rs index dd539087..0aa2e581 100644 --- a/sentry_streams/src/sinks.rs +++ b/sentry_streams/src/sinks.rs @@ -42,6 +42,8 @@ use std::time::Duration; struct SerializableWatermark { committable: HashMap>>, timestamp: u64, + #[serde(default)] + message_time: Option, } impl From for SerializableWatermark { @@ -66,6 +68,7 @@ impl From for SerializableWatermark { SerializableWatermark { committable, timestamp: value.timestamp, + message_time: value.message_time, } } } @@ -84,6 +87,7 @@ impl From for Watermark { Watermark { committable, timestamp: value.timestamp, + message_time: value.message_time, } } } @@ -373,7 +377,7 @@ mod tests { let watermark_val = RoutedValue { route: Route::new(String::from("source"), vec![]), - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new()); let watermark_res = sink.submit(watermark_msg); diff --git a/sentry_streams/src/testutils.rs b/sentry_streams/src/testutils.rs index 787abc2b..7f861025 100644 --- a/sentry_streams/src/testutils.rs +++ b/sentry_streams/src/testutils.rs @@ -51,6 +51,17 @@ pub fn build_routed_value( msg_payload: Py, source: &str, waypoints: Vec, +) -> RoutedValue { + build_routed_value_with_timestamp(py, msg_payload, source, waypoints, 0.0) +} + +#[cfg(test)] +pub fn build_routed_value_with_timestamp( + py: Python<'_>, + msg_payload: Py, + source: &str, + waypoints: Vec, + timestamp: f64, ) -> RoutedValue { let route = Route::new(source.to_string(), waypoints); let payload = PyStreamingMessage::PyAnyMessage { @@ -59,7 +70,7 @@ pub fn build_routed_value( PyAnyMessage { payload: msg_payload, headers: vec![], - timestamp: 0.0, + timestamp, schema: None, }, ) diff --git a/sentry_streams/src/transformer.rs b/sentry_streams/src/transformer.rs index 931b5353..6fe3d325 100644 --- a/sentry_streams/src/transformer.rs +++ b/sentry_streams/src/transformer.rs @@ -250,7 +250,7 @@ mod tests { let watermark_val = RoutedValue { route: Route::new(String::from("source"), vec![]), - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new()); let watermark_res = strategy.submit(watermark_msg); diff --git a/sentry_streams/src/watermark.rs b/sentry_streams/src/watermark.rs index e9e0df12..de663de3 100644 --- a/sentry_streams/src/watermark.rs +++ b/sentry_streams/src/watermark.rs @@ -1,4 +1,4 @@ -use crate::messages::RoutedValuePayload; +use crate::messages::{data_message_time_secs_from_routed, RoutedValuePayload}; use crate::routes::{Route, RoutedValue}; use sentry_arroyo::processing::strategies::{ CommitRequest, InvalidMessage, ProcessingStrategy, StrategyError, SubmitError, @@ -25,6 +25,8 @@ pub struct WatermarkEmitter { pub period: u64, pub watermark_committable: BTreeMap, last_sent_timestamp: u64, + /// Latest data message time (epoch seconds) since the last emitted watermark. + last_data_message_time: Option, } impl WatermarkEmitter { @@ -41,6 +43,7 @@ impl WatermarkEmitter { period, watermark_committable: empty_committable, last_sent_timestamp: current_timestamp, + last_data_message_time: None, } } @@ -50,11 +53,13 @@ impl WatermarkEmitter { fn send_watermark_msg(&mut self) -> Result<(), InvalidMessage> { let timestamp = current_epoch(); + let message_time = self.last_data_message_time; let watermark_msg = RoutedValue { route: self.route.clone(), payload: RoutedValuePayload::make_watermark_payload( self.watermark_committable.clone(), timestamp, + message_time, ), }; let result = self.next_step.submit(Message::new_any_message( @@ -65,6 +70,7 @@ impl WatermarkEmitter { Ok(..) => { self.last_sent_timestamp = timestamp; self.watermark_committable = BTreeMap::new(); + self.last_data_message_time = None; Ok(()) } Err(err) => match err { @@ -97,6 +103,9 @@ impl ProcessingStrategy for WatermarkEmitter { fn submit(&mut self, message: Message) -> Result<(), SubmitError> { self.merge_watermark_committable(&message); + if let Some(t) = data_message_time_secs_from_routed(&message) { + self.last_data_message_time = Some(t); + } self.next_step.submit(message) } @@ -186,7 +195,7 @@ mod tests { set_timestamp(20); let _ = watermark.poll(); assert_watermarks_match( - vec![Watermark::new(expected_committable, 0)], + vec![Watermark::with_message_time(expected_committable, 20, Some(0))], submitted_watermarks_clone.lock().unwrap().deref(), ); set_timestamp(0); diff --git a/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py b/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py index d0fd12f9..e5f47896 100644 --- a/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py +++ b/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py @@ -106,12 +106,15 @@ def build_py_msg( def build_watermark( - committable: Committable, timestamp: int + committable: Committable, + timestamp: int, + message_time: int | None = None, ) -> Tuple[PipelineMessage, Committable]: return ( PyWatermark( committable, timestamp, + message_time, ), committable, ) From 4a9756b04a0c728affa7c68a41929015f94092f3 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 7 May 2026 14:39:55 -0700 Subject: [PATCH 2/6] style: cargo fmt Co-authored-by: Cursor --- sentry_streams/src/batch_step.rs | 6 ++++- sentry_streams/src/messages.rs | 46 ++++++++++++-------------------- sentry_streams/src/watermark.rs | 6 ++++- 3 files changed, 27 insertions(+), 31 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 038c1c97..eca5066e 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -285,7 +285,11 @@ impl BatchStep { self.outbound.push_back(batch_msg); self.pending_batch = true; - self.enqueue_watermark_tail(wm_after_batch, committable_for_synthetic, batch_message_time); + self.enqueue_watermark_tail( + wm_after_batch, + committable_for_synthetic, + batch_message_time, + ); Ok(()) } diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index e1ef43af..4c6662d9 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -120,10 +120,7 @@ impl Clone for WatermarkMessage { traced_with_gil!(|py| { let committable = py_watermark.committable.clone_ref(py); let timestamp = py_watermark.timestamp.clone_ref(py); - let message_time = py_watermark - .message_time - .as_ref() - .map(|m| m.clone_ref(py)); + let message_time = py_watermark.message_time.as_ref().map(|m| m.clone_ref(py)); WatermarkMessage::PyWatermark(PyWatermark { committable, timestamp, @@ -432,12 +429,8 @@ impl RoutedValuePayload { /// Unix epoch seconds (floor) from a streaming payload (requires GIL). pub fn streaming_payload_timestamp_floor_secs(py: Python<'_>, sm: &PyStreamingMessage) -> u64 { match sm { - PyStreamingMessage::PyAnyMessage { content } => { - content.bind(py).borrow().timestamp as u64 - } - PyStreamingMessage::RawMessage { content } => { - content.bind(py).borrow().timestamp as u64 - } + PyStreamingMessage::PyAnyMessage { content } => content.bind(py).borrow().timestamp as u64, + PyStreamingMessage::RawMessage { content } => content.bind(py).borrow().timestamp as u64, } } @@ -446,9 +439,9 @@ pub fn data_message_time_secs_from_routed(message: &Message) -> Opt let rv = message.payload(); match &rv.payload { RoutedValuePayload::WatermarkMessage(_) => None, - RoutedValuePayload::PyStreamingMessage(ref sm) => { - Some(traced_with_gil!(|py| streaming_payload_timestamp_floor_secs(py, sm))) - } + RoutedValuePayload::PyStreamingMessage(ref sm) => Some(traced_with_gil!(|py| { + streaming_payload_timestamp_floor_secs(py, sm) + })), } } @@ -472,9 +465,7 @@ impl From<&WatermarkMessage> for Py { WatermarkMessage::Watermark(watermark) => PyWatermark::new( convert_committable_to_py(py, watermark.committable.clone()).unwrap(), make_py_int(py, watermark.timestamp), - watermark - .message_time - .map(|t| make_py_int(py, t)), + watermark.message_time.map(|t| make_py_int(py, t)), ) .unwrap() .into_py_any(py) @@ -482,10 +473,7 @@ impl From<&WatermarkMessage> for Py { WatermarkMessage::PyWatermark(watermark) => PyWatermark::new( watermark.committable.clone_ref(py), watermark.timestamp.clone_ref(py), - watermark - .message_time - .as_ref() - .map(|m| m.clone_ref(py)), + watermark.message_time.as_ref().map(|m| m.clone_ref(py)), ) .unwrap() .into_py_any(py) @@ -565,9 +553,12 @@ impl TryFrom> for WatermarkMessage { let message_time = match &py_watermark.borrow().message_time { None => None, - Some(py_int) => Some(py_int.bind(py).extract::().map_err(|e| { - pyo3::exceptions::PyTypeError::new_err(e.to_string()) - })?), + Some(py_int) => Some( + py_int + .bind(py) + .extract::() + .map_err(|e| pyo3::exceptions::PyTypeError::new_err(e.to_string()))?, + ), }; Ok(WatermarkMessage::Watermark(Watermark::with_message_time( @@ -779,12 +770,9 @@ mod tests { let _ = committable.set_item(key.unwrap(), 0); // Create PyAnyMessage - let msg = PyWatermark::new( - committable.unbind().clone_ref(py), - make_py_int(py, 0), - None, - ) - .unwrap(); + let msg = + PyWatermark::new(committable.unbind().clone_ref(py), make_py_int(py, 0), None) + .unwrap(); // Check payload let payload_val: BTreeMap<(String, u64), u64> = diff --git a/sentry_streams/src/watermark.rs b/sentry_streams/src/watermark.rs index de663de3..c55d4fc1 100644 --- a/sentry_streams/src/watermark.rs +++ b/sentry_streams/src/watermark.rs @@ -195,7 +195,11 @@ mod tests { set_timestamp(20); let _ = watermark.poll(); assert_watermarks_match( - vec![Watermark::with_message_time(expected_committable, 20, Some(0))], + vec![Watermark::with_message_time( + expected_committable, + 20, + Some(0), + )], submitted_watermarks_clone.lock().unwrap().deref(), ); set_timestamp(0); From a83b271fc54d43c4c65a51bb7cb3ae6a977cca60 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 7 May 2026 16:52:09 -0700 Subject: [PATCH 3/6] ref(streams): Use f64 for watermark message_time Address review on PR #314: align watermark message_time with the codebase convention of f64 message timestamps so we no longer floor sub-second precision when threading the value through native code, serialization, and Python. - Watermark, SerializableWatermark, PyWatermark, make_watermark_payload, Watermark::with_message_time and the Py<->Rust conversions now carry message_time as Option (Python float). - WatermarkEmitter and Batch read PyAnyMessage/RawMessage timestamps directly as f64; the streaming_payload_timestamp_floor_secs and data_message_time_secs_from_routed helpers are removed. - WatermarkCommitOffsets latency math becomes (current_epoch as f64) - oldest_message_time, clamped at 0 to absorb small clock skew. - rust_streams.pyi and the build_watermark test helper expose message_time as float | None. Drop the build_py_metrics_config docstring paragraph documenting the new metric (per review). Co-Authored-By: Claude Opus 4.7 Co-authored-by: Cursor --- .../adapters/arroyo/rust_arroyo.py | 8 +- .../sentry_streams/rust_streams.pyi | 4 +- sentry_streams/src/batch_step.rs | 25 ++-- sentry_streams/src/commit_policy.rs | 108 +++++++++++++++--- sentry_streams/src/messages.rs | 53 +++------ sentry_streams/src/sinks.rs | 2 +- sentry_streams/src/watermark.rs | 18 ++- .../arroyo/helpers/message_helpers.py | 2 +- 8 files changed, 140 insertions(+), 80 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 689612bd..0d0b8647 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -81,13 +81,7 @@ def build_py_metrics_config(cfg: MetricsConfig) -> PyMetricConfig | None: - """Build Rust-side DogStatsD config from the same metrics dict used by configure_metrics. - - When this is installed, the consumer also emits - ``streams.pipeline.consumer.watermark_commit_latency`` (histogram, seconds): wall time from - optional watermark ``message_time`` (epoch seconds of the last or oldest batched data row) to - the commit decision; 0 is recorded when ``message_time`` is absent. - """ + """Build Rust-side DogStatsD config from the same metrics dict used by configure_metrics.""" if cfg["type"] != "datadog": return None return PyMetricConfig( diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 4fc1f7a3..889450a0 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -200,11 +200,11 @@ class PyWatermark: self, payload: dict[tuple[str, int], int], timestamp: int, - message_time: int | None = None, + message_time: float | None = None, ) -> None: ... @property def committable(self) -> dict[tuple[str, int], int]: ... @property def timestamp(self) -> int: ... @property - def message_time(self) -> int | None: ... + def message_time(self) -> float | None: ... diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index eca5066e..d9c90829 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -5,10 +5,7 @@ //! element. Watermark handling and backpressure are unchanged. //! //! The GIL is taken only to build the list on flush (after [`Message::into_payload`] in submit). -use crate::messages::{ - into_pyany, streaming_payload_timestamp_floor_secs, PyAnyMessage, PyStreamingMessage, - RoutedValuePayload, -}; +use crate::messages::{into_pyany, PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; use crate::pipeline_stats::get_stats; use crate::routes::{Route, RoutedValue}; use crate::time_helpers::current_epoch; @@ -129,15 +126,23 @@ impl Batch { self.batch_offsets.clone() } - /// Minimum of per-row logical timestamps (epoch seconds, floor), for synthetic watermarks. - pub fn oldest_message_time_secs(&self) -> Option { + /// Minimum of per-row logical timestamps (epoch seconds, sub-second precision), for synthetic + /// watermarks. + pub fn oldest_message_time_secs(&self) -> Option { if self.elements.is_empty() { return None; } traced_with_gil!(|py| { - let mut min_t: Option = None; + let mut min_t: Option = None; for el in &self.elements { - let t = streaming_payload_timestamp_floor_secs(py, el); + let t = match el { + PyStreamingMessage::PyAnyMessage { content } => { + content.bind(py).borrow().timestamp + } + PyStreamingMessage::RawMessage { content } => { + content.bind(py).borrow().timestamp + } + }; min_t = Some(min_t.map_or(t, |m| m.min(t))); } min_t @@ -297,7 +302,7 @@ impl BatchStep { &mut self, wm_after_batch: Vec>, committable: BTreeMap, - message_time: Option, + message_time: Option, ) { for m in wm_after_batch { self.outbound.push_back(m); @@ -786,7 +791,7 @@ mod tests { }); let w = wms.lock().unwrap(); assert_eq!(w.len(), 1); - assert_eq!(w[0].message_time, Some(5000)); + assert_eq!(w[0].message_time, Some(5000.25)); } #[test] diff --git a/sentry_streams/src/commit_policy.rs b/sentry_streams/src/commit_policy.rs index 5363a414..603435c9 100644 --- a/sentry_streams/src/commit_policy.rs +++ b/sentry_streams/src/commit_policy.rs @@ -7,7 +7,10 @@ use sentry_arroyo::processing::strategies::{ use sentry_arroyo::types::{Message, Partition}; use crate::messages::{RoutedValuePayload, WatermarkMessage}; +#[cfg(test)] +use crate::mocks::current_epoch; use crate::routes::RoutedValue; +#[cfg(not(test))] use crate::time_helpers::current_epoch; /// Histogram: seconds from watermark `message_time` (or 0 if absent) to commit decision. @@ -19,7 +22,7 @@ struct WatermarkTracker { num_watermarks: u64, committable: HashMap, time_added: Instant, - message_time: Option, + message_time: Option, } /// WatermarkCommitOffsets is a commit policy that only commits once it receives a copy of a Watermark @@ -57,6 +60,10 @@ impl WatermarkCommitOffsets { }; let mut to_remove = vec![]; let mut commit_request = empty_commit_request.clone(); + // Track the oldest (minimum) message_time across all watermarks that contribute + // to the merged commit, so we record latency once per commit() based on the + // watermark furthest behind. + let mut oldest_message_time: Option = None; for (ts, watermark) in self.watermarks.iter() { if watermark.num_watermarks == self.num_branches { let current_request = CommitRequest { @@ -65,12 +72,12 @@ impl WatermarkCommitOffsets { commit_request = merge_commit_request(Some(commit_request), Some(current_request)).unwrap(); - let now = current_epoch(); - let secs = watermark - .message_time - .map(|t| now.saturating_sub(t) as f64) - .unwrap_or(0.0); - metrics::histogram!(METRIC_WATERMARK_COMMIT_LATENCY).record(secs); + if let Some(t) = watermark.message_time { + oldest_message_time = Some(match oldest_message_time { + Some(prev) => prev.min(t), + None => t, + }); + } to_remove.push(ts.clone()); // Clean up any hanging watermarks which still haven't gotten all their copies in 5 min @@ -84,6 +91,10 @@ impl WatermarkCommitOffsets { } if commit_request != empty_commit_request { + let secs = oldest_message_time + .map(|t| ((current_epoch() as f64) - t).max(0.0)) + .unwrap_or(0.0); + metrics::histogram!(METRIC_WATERMARK_COMMIT_LATENCY).record(secs); Some(commit_request) } else { None @@ -144,15 +155,60 @@ impl ProcessingStrategy for WatermarkCommitOffsets { #[cfg(test)] mod tests { - use crate::{messages::Watermark, routes::Route, testutils::make_committable}; + use crate::messages::Watermark; + use crate::mocks::set_timestamp; + use crate::{routes::Route, testutils::make_committable}; + + use metrics::{Key, KeyName, Metadata, Recorder, SharedString, Unit}; + use std::sync::{Arc, Mutex}; use super::*; + /// Minimal histogram-only recorder modeled on `pipeline_stats::tests::CaptureRecorder`. + #[derive(Default)] + struct CaptureRecorder { + histograms: Arc>>, + } + + impl Recorder for CaptureRecorder { + fn describe_counter(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_gauge(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_histogram(&self, _: KeyName, _: Option, _: SharedString) {} + fn register_counter(&self, _: &Key, _: &Metadata<'_>) -> metrics::Counter { + metrics::Counter::noop() + } + fn register_gauge(&self, _: &Key, _: &Metadata<'_>) -> metrics::Gauge { + metrics::Gauge::noop() + } + fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> metrics::Histogram { + metrics::Histogram::from_arc(Arc::new(CaptureHistogram { + key: key.clone(), + histograms: Arc::clone(&self.histograms), + })) + } + } + + struct CaptureHistogram { + key: Key, + histograms: Arc>>, + } + + impl metrics::HistogramFn for CaptureHistogram { + fn record(&self, value: f64) { + self.histograms + .lock() + .unwrap() + .push((self.key.clone(), value)); + } + } + #[test] fn test_commit_offsets() { + // Pin current_epoch so the latency metric is deterministic. + set_timestamp(100); let mut commit_step = WatermarkCommitOffsets::new(2); - let watermark = Watermark::new(make_committable(3, 0), 0); + let watermark = Watermark::with_message_time(make_committable(3, 0), 0, Some(80.0)); let mut messages = vec![]; for waypoint in ["route1", "route2"] { messages.push(Message::new_any_message( @@ -180,11 +236,37 @@ mod tests { ); } - // Second watermark actually returns CommitRequest on poll() + // Second watermark actually returns CommitRequest on poll() and records the latency + // metric once based on the (only) tracker's message_time. let _ = commit_step.submit(messages[1].clone()); assert_eq!(commit_step.watermarks[&ts].num_watermarks, 2); - if let Ok(None) = commit_step.poll() { - panic!("Commit step returned didn't return CommitRequest with 2 watermarks"); - } + + let histograms = Arc::new(Mutex::new(Vec::<(Key, f64)>::new())); + let recorder = CaptureRecorder { + histograms: Arc::clone(&histograms), + }; + let result = { + let _guard = metrics::set_default_local_recorder(&recorder); + commit_step.poll() + }; + assert!( + matches!(result, Ok(Some(_))), + "Commit step returned didn't return CommitRequest with 2 watermarks" + ); + + let recorded: Vec = histograms + .lock() + .unwrap() + .iter() + .filter(|(k, _)| k.name() == METRIC_WATERMARK_COMMIT_LATENCY) + .map(|(_, v)| *v) + .collect(); + assert_eq!( + recorded, + vec![20.0], + "expected exactly one latency sample of (current_epoch - message_time)" + ); + + set_timestamp(0); } } diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 4c6662d9..b321753a 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -39,10 +39,9 @@ use pyo3::types::{PyBytes, PyDict, PyInt, PyList, PyTuple}; use pyo3::Python; use pyo3::{prelude::*, types::PySequence, IntoPyObjectExt}; -use sentry_arroyo::types::{Message, Partition}; +use sentry_arroyo::types::Partition; use crate::committable::{convert_committable_to_py, convert_py_committable}; -use crate::routes::RoutedValue; use crate::utils::traced_with_gil; // Used from `mod tests` only; the library does not read headers from the Python constructor. @@ -120,7 +119,7 @@ impl Clone for WatermarkMessage { traced_with_gil!(|py| { let committable = py_watermark.committable.clone_ref(py); let timestamp = py_watermark.timestamp.clone_ref(py); - let message_time = py_watermark.message_time.as_ref().map(|m| m.clone_ref(py)); + let message_time = py_watermark.message_time; WatermarkMessage::PyWatermark(PyWatermark { committable, timestamp, @@ -137,9 +136,10 @@ impl Clone for WatermarkMessage { pub struct Watermark { pub committable: BTreeMap, pub timestamp: u64, - /// Unix epoch seconds (floor) from the newest data message since the previous watermark, - /// or the oldest message in a batch for synthetic batch watermarks. `None` if no such message. - pub message_time: Option, + /// Unix epoch seconds (with sub-second precision) from the newest data message since the + /// previous watermark, or the oldest message in a batch for synthetic batch watermarks. + /// `None` if no such message. + pub message_time: Option, } impl Watermark { @@ -154,7 +154,7 @@ impl Watermark { pub fn with_message_time( committable: BTreeMap, timestamp: u64, - message_time: Option, + message_time: Option, ) -> Self { Self { committable, @@ -174,7 +174,7 @@ pub struct PyWatermark { #[pyo3(get)] pub timestamp: Py, #[pyo3(get)] - pub message_time: Option>, + pub message_time: Option, } #[pymethods] @@ -184,7 +184,7 @@ impl PyWatermark { pub fn new( committable: Py, timestamp: Py, - message_time: Option>, + message_time: Option, ) -> PyResult { Ok(Self { committable, @@ -418,7 +418,7 @@ impl RoutedValuePayload { pub fn make_watermark_payload( committable: BTreeMap, timestamp: u64, - message_time: Option, + message_time: Option, ) -> Self { RoutedValuePayload::WatermarkMessage(WatermarkMessage::Watermark( Watermark::with_message_time(committable, timestamp, message_time), @@ -426,25 +426,6 @@ impl RoutedValuePayload { } } -/// Unix epoch seconds (floor) from a streaming payload (requires GIL). -pub fn streaming_payload_timestamp_floor_secs(py: Python<'_>, sm: &PyStreamingMessage) -> u64 { - match sm { - PyStreamingMessage::PyAnyMessage { content } => content.bind(py).borrow().timestamp as u64, - PyStreamingMessage::RawMessage { content } => content.bind(py).borrow().timestamp as u64, - } -} - -/// For a routed broker/data message, the logical message time used for watermark `message_time`. -pub fn data_message_time_secs_from_routed(message: &Message) -> Option { - let rv = message.payload(); - match &rv.payload { - RoutedValuePayload::WatermarkMessage(_) => None, - RoutedValuePayload::PyStreamingMessage(ref sm) => Some(traced_with_gil!(|py| { - streaming_payload_timestamp_floor_secs(py, sm) - })), - } -} - impl Clone for RoutedValuePayload { fn clone(&self) -> Self { match self { @@ -465,7 +446,7 @@ impl From<&WatermarkMessage> for Py { WatermarkMessage::Watermark(watermark) => PyWatermark::new( convert_committable_to_py(py, watermark.committable.clone()).unwrap(), make_py_int(py, watermark.timestamp), - watermark.message_time.map(|t| make_py_int(py, t)), + watermark.message_time, ) .unwrap() .into_py_any(py) @@ -473,7 +454,7 @@ impl From<&WatermarkMessage> for Py { WatermarkMessage::PyWatermark(watermark) => PyWatermark::new( watermark.committable.clone_ref(py), watermark.timestamp.clone_ref(py), - watermark.message_time.as_ref().map(|m| m.clone_ref(py)), + watermark.message_time, ) .unwrap() .into_py_any(py) @@ -551,15 +532,7 @@ impl TryFrom> for WatermarkMessage { Err(e) => return Err(pyo3::exceptions::PyTypeError::new_err(e.to_string())), }; - let message_time = match &py_watermark.borrow().message_time { - None => None, - Some(py_int) => Some( - py_int - .bind(py) - .extract::() - .map_err(|e| pyo3::exceptions::PyTypeError::new_err(e.to_string()))?, - ), - }; + let message_time = py_watermark.borrow().message_time; Ok(WatermarkMessage::Watermark(Watermark::with_message_time( committable, diff --git a/sentry_streams/src/sinks.rs b/sentry_streams/src/sinks.rs index 0aa2e581..d16f0144 100644 --- a/sentry_streams/src/sinks.rs +++ b/sentry_streams/src/sinks.rs @@ -43,7 +43,7 @@ struct SerializableWatermark { committable: HashMap>>, timestamp: u64, #[serde(default)] - message_time: Option, + message_time: Option, } impl From for SerializableWatermark { diff --git a/sentry_streams/src/watermark.rs b/sentry_streams/src/watermark.rs index c55d4fc1..0a520726 100644 --- a/sentry_streams/src/watermark.rs +++ b/sentry_streams/src/watermark.rs @@ -1,5 +1,6 @@ -use crate::messages::{data_message_time_secs_from_routed, RoutedValuePayload}; +use crate::messages::{PyStreamingMessage, RoutedValuePayload}; use crate::routes::{Route, RoutedValue}; +use crate::utils::traced_with_gil; use sentry_arroyo::processing::strategies::{ CommitRequest, InvalidMessage, ProcessingStrategy, StrategyError, SubmitError, }; @@ -25,8 +26,9 @@ pub struct WatermarkEmitter { pub period: u64, pub watermark_committable: BTreeMap, last_sent_timestamp: u64, - /// Latest data message time (epoch seconds) since the last emitted watermark. - last_data_message_time: Option, + /// Latest data message time (epoch seconds, sub-second precision) since the last emitted + /// watermark. + last_data_message_time: Option, } impl WatermarkEmitter { @@ -103,8 +105,12 @@ impl ProcessingStrategy for WatermarkEmitter { fn submit(&mut self, message: Message) -> Result<(), SubmitError> { self.merge_watermark_committable(&message); - if let Some(t) = data_message_time_secs_from_routed(&message) { - self.last_data_message_time = Some(t); + if let RoutedValuePayload::PyStreamingMessage(ref sm) = &message.payload().payload { + let ts = traced_with_gil!(|py| match sm { + PyStreamingMessage::PyAnyMessage { content } => content.bind(py).borrow().timestamp, + PyStreamingMessage::RawMessage { content } => content.bind(py).borrow().timestamp, + }); + self.last_data_message_time = Some(ts); } self.next_step.submit(message) } @@ -198,7 +204,7 @@ mod tests { vec![Watermark::with_message_time( expected_committable, 20, - Some(0), + Some(0.0), )], submitted_watermarks_clone.lock().unwrap().deref(), ); diff --git a/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py b/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py index e5f47896..cc88389a 100644 --- a/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py +++ b/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py @@ -108,7 +108,7 @@ def build_py_msg( def build_watermark( committable: Committable, timestamp: int, - message_time: int | None = None, + message_time: float | None = None, ) -> Tuple[PipelineMessage, Committable]: return ( PyWatermark( From e8a32dc05258dee6549719d72b9a71fbd7b20554 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 7 May 2026 17:01:57 -0700 Subject: [PATCH 4/6] Remove comments --- sentry_streams/src/batch_step.rs | 4 ++-- sentry_streams/src/watermark.rs | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index d9c90829..32361b7d 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -128,7 +128,7 @@ impl Batch { /// Minimum of per-row logical timestamps (epoch seconds, sub-second precision), for synthetic /// watermarks. - pub fn oldest_message_time_secs(&self) -> Option { + pub fn oldest_message_time(&self) -> Option { if self.elements.is_empty() { return None; } @@ -281,7 +281,7 @@ impl BatchStep { // We create a synthetic watermark to avoid waiting for the next batch to complete before // allowing the consumer to commit. let committable_for_synthetic = b.current_offsets_snapshot(); - let batch_message_time = b.oldest_message_time_secs(); + let batch_message_time = b.oldest_message_time(); let flush_start = Instant::now(); let batch_msg = b.flush()?; get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64()); diff --git a/sentry_streams/src/watermark.rs b/sentry_streams/src/watermark.rs index 0a520726..0a6ef1b6 100644 --- a/sentry_streams/src/watermark.rs +++ b/sentry_streams/src/watermark.rs @@ -26,8 +26,6 @@ pub struct WatermarkEmitter { pub period: u64, pub watermark_committable: BTreeMap, last_sent_timestamp: u64, - /// Latest data message time (epoch seconds, sub-second precision) since the last emitted - /// watermark. last_data_message_time: Option, } From 90b6fe0def597a9420b20367f45a54d0942fd7ba Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 8 May 2026 11:32:49 -0700 Subject: [PATCH 5/6] ref(streams): Rename watermark message_time to last_message_time Clarifies that the field refers to the last (or oldest batched) data row timestamp, not the watermark's own timestamp. - Watermark, PyWatermark, WatermarkTracker, and make_watermark_payload use last_message_time; Watermark::with_last_message_time replaces with_message_time. - SerializableWatermark serializes as last_message_time with serde alias message_time for older JSON payloads. - Batch helper renamed to oldest_batch_row_timestamp. Refs https://github.com/getsentry/streams/pull/314#discussion_r3208998677 Co-Authored-By: Claude Opus 4.7 Co-authored-by: Cursor --- .../sentry_streams/rust_streams.pyi | 4 +- sentry_streams/src/batch_step.rs | 14 ++++--- sentry_streams/src/commit_policy.rs | 24 +++++------ sentry_streams/src/fake_strategy.rs | 2 +- sentry_streams/src/messages.rs | 40 +++++++++---------- sentry_streams/src/sinks.rs | 30 ++++++++++++-- sentry_streams/src/watermark.rs | 6 +-- .../arroyo/helpers/message_helpers.py | 4 +- 8 files changed, 73 insertions(+), 51 deletions(-) diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 889450a0..0ac038f1 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -200,11 +200,11 @@ class PyWatermark: self, payload: dict[tuple[str, int], int], timestamp: int, - message_time: float | None = None, + last_message_time: float | None = None, ) -> None: ... @property def committable(self) -> dict[tuple[str, int], int]: ... @property def timestamp(self) -> int: ... @property - def message_time(self) -> float | None: ... + def last_message_time(self) -> float | None: ... diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index b9d8d05b..14322640 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -134,7 +134,9 @@ impl Batch { /// Minimum of per-row logical timestamps (epoch seconds, sub-second precision), for synthetic /// watermarks. - pub fn oldest_message_time(&self) -> Option { + /// Minimum of per-row logical timestamps (epoch seconds, sub-second precision) in this batch; + /// used as `last_message_time` on synthetic watermarks after flush. + pub fn oldest_batch_row_timestamp(&self) -> Option { if self.elements.is_empty() { return None; } @@ -287,7 +289,7 @@ impl BatchStep { // We create a synthetic watermark to avoid waiting for the next batch to complete before // allowing the consumer to commit. let committable_for_synthetic = b.current_offsets_snapshot(); - let batch_message_time = b.oldest_message_time(); + let batch_last_message_time = b.oldest_batch_row_timestamp(); let batch_elements = b.len() as f64; let batch_open_ms = b.created_at.elapsed().as_millis() as f64; let flush_start = Instant::now(); @@ -304,7 +306,7 @@ impl BatchStep { self.enqueue_watermark_tail( wm_after_batch, committable_for_synthetic, - batch_message_time, + batch_last_message_time, ); Ok(()) } @@ -313,7 +315,7 @@ impl BatchStep { &mut self, wm_after_batch: Vec>, committable: BTreeMap, - message_time: Option, + last_message_time: Option, ) { for m in wm_after_batch { self.outbound.push_back(m); @@ -325,7 +327,7 @@ impl BatchStep { payload: RoutedValuePayload::make_watermark_payload( committable.clone(), ts, - message_time, + last_message_time, ), }, committable, @@ -802,7 +804,7 @@ mod tests { }); let w = wms.lock().unwrap(); assert_eq!(w.len(), 1); - assert_eq!(w[0].message_time, Some(5000.25)); + assert_eq!(w[0].last_message_time, Some(5000.25)); } #[test] diff --git a/sentry_streams/src/commit_policy.rs b/sentry_streams/src/commit_policy.rs index 603435c9..32937d1e 100644 --- a/sentry_streams/src/commit_policy.rs +++ b/sentry_streams/src/commit_policy.rs @@ -13,7 +13,7 @@ use crate::routes::RoutedValue; #[cfg(not(test))] use crate::time_helpers::current_epoch; -/// Histogram: seconds from watermark `message_time` (or 0 if absent) to commit decision. +/// Histogram: seconds from watermark `last_message_time` (or 0 if absent) to commit decision. const METRIC_WATERMARK_COMMIT_LATENCY: &str = "streams.pipeline.consumer.watermark_commit_latency"; /// Records the committable of a received Watermark and records how many times that watermark has been seen. @@ -22,7 +22,7 @@ struct WatermarkTracker { num_watermarks: u64, committable: HashMap, time_added: Instant, - message_time: Option, + last_message_time: Option, } /// WatermarkCommitOffsets is a commit policy that only commits once it receives a copy of a Watermark @@ -60,10 +60,10 @@ impl WatermarkCommitOffsets { }; let mut to_remove = vec![]; let mut commit_request = empty_commit_request.clone(); - // Track the oldest (minimum) message_time across all watermarks that contribute + // Track the oldest (minimum) last_message_time across all watermarks that contribute // to the merged commit, so we record latency once per commit() based on the // watermark furthest behind. - let mut oldest_message_time: Option = None; + let mut oldest_last_message_time: Option = None; for (ts, watermark) in self.watermarks.iter() { if watermark.num_watermarks == self.num_branches { let current_request = CommitRequest { @@ -72,8 +72,8 @@ impl WatermarkCommitOffsets { commit_request = merge_commit_request(Some(commit_request), Some(current_request)).unwrap(); - if let Some(t) = watermark.message_time { - oldest_message_time = Some(match oldest_message_time { + if let Some(t) = watermark.last_message_time { + oldest_last_message_time = Some(match oldest_last_message_time { Some(prev) => prev.min(t), None => t, }); @@ -91,7 +91,7 @@ impl WatermarkCommitOffsets { } if commit_request != empty_commit_request { - let secs = oldest_message_time + let secs = oldest_last_message_time .map(|t| ((current_epoch() as f64) - t).max(0.0)) .unwrap_or(0.0); metrics::histogram!(METRIC_WATERMARK_COMMIT_LATENCY).record(secs); @@ -119,7 +119,7 @@ impl ProcessingStrategy for WatermarkCommitOffsets { num_watermarks: tracker.num_watermarks + 1, committable: tracker.committable.clone(), time_added: tracker.time_added, - message_time: tracker.message_time, + last_message_time: tracker.last_message_time, }, ); } @@ -134,7 +134,7 @@ impl ProcessingStrategy for WatermarkCommitOffsets { num_watermarks: 1, committable: committable, time_added: Instant::now(), - message_time: watermark.message_time, + last_message_time: watermark.last_message_time, }, ); } @@ -208,7 +208,7 @@ mod tests { set_timestamp(100); let mut commit_step = WatermarkCommitOffsets::new(2); - let watermark = Watermark::with_message_time(make_committable(3, 0), 0, Some(80.0)); + let watermark = Watermark::with_last_message_time(make_committable(3, 0), 0, Some(80.0)); let mut messages = vec![]; for waypoint in ["route1", "route2"] { messages.push(Message::new_any_message( @@ -237,7 +237,7 @@ mod tests { } // Second watermark actually returns CommitRequest on poll() and records the latency - // metric once based on the (only) tracker's message_time. + // metric once based on the (only) tracker's last_message_time. let _ = commit_step.submit(messages[1].clone()); assert_eq!(commit_step.watermarks[&ts].num_watermarks, 2); @@ -264,7 +264,7 @@ mod tests { assert_eq!( recorded, vec![20.0], - "expected exactly one latency sample of (current_epoch - message_time)" + "expected exactly one latency sample of (current_epoch - last_message_time)" ); set_timestamp(0); diff --git a/sentry_streams/src/fake_strategy.rs b/sentry_streams/src/fake_strategy.rs index e66703ec..b893289a 100644 --- a/sentry_streams/src/fake_strategy.rs +++ b/sentry_streams/src/fake_strategy.rs @@ -139,6 +139,6 @@ pub fn assert_watermarks_match(expected_messages: Vec, actual_message for (actual, expected) in actual_messages.iter().zip(expected_messages.iter()) { assert_eq!(actual.committable, expected.committable); assert_eq!(actual.timestamp, expected.timestamp); - assert_eq!(actual.message_time, expected.message_time); + assert_eq!(actual.last_message_time, expected.last_message_time); } } diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index b321753a..39075823 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -119,11 +119,11 @@ impl Clone for WatermarkMessage { traced_with_gil!(|py| { let committable = py_watermark.committable.clone_ref(py); let timestamp = py_watermark.timestamp.clone_ref(py); - let message_time = py_watermark.message_time; + let last_message_time = py_watermark.last_message_time; WatermarkMessage::PyWatermark(PyWatermark { committable, timestamp, - message_time, + last_message_time, }) }) } @@ -139,7 +139,7 @@ pub struct Watermark { /// Unix epoch seconds (with sub-second precision) from the newest data message since the /// previous watermark, or the oldest message in a batch for synthetic batch watermarks. /// `None` if no such message. - pub message_time: Option, + pub last_message_time: Option, } impl Watermark { @@ -147,19 +147,19 @@ impl Watermark { Self { committable, timestamp, - message_time: None, + last_message_time: None, } } - pub fn with_message_time( + pub fn with_last_message_time( committable: BTreeMap, timestamp: u64, - message_time: Option, + last_message_time: Option, ) -> Self { Self { committable, timestamp, - message_time, + last_message_time, } } } @@ -174,22 +174,22 @@ pub struct PyWatermark { #[pyo3(get)] pub timestamp: Py, #[pyo3(get)] - pub message_time: Option, + pub last_message_time: Option, } #[pymethods] impl PyWatermark { #[new] - #[pyo3(signature = (committable, timestamp, message_time=None))] + #[pyo3(signature = (committable, timestamp, last_message_time=None))] pub fn new( committable: Py, timestamp: Py, - message_time: Option, + last_message_time: Option, ) -> PyResult { Ok(Self { committable, timestamp, - message_time, + last_message_time, }) } } @@ -418,10 +418,10 @@ impl RoutedValuePayload { pub fn make_watermark_payload( committable: BTreeMap, timestamp: u64, - message_time: Option, + last_message_time: Option, ) -> Self { RoutedValuePayload::WatermarkMessage(WatermarkMessage::Watermark( - Watermark::with_message_time(committable, timestamp, message_time), + Watermark::with_last_message_time(committable, timestamp, last_message_time), )) } } @@ -446,7 +446,7 @@ impl From<&WatermarkMessage> for Py { WatermarkMessage::Watermark(watermark) => PyWatermark::new( convert_committable_to_py(py, watermark.committable.clone()).unwrap(), make_py_int(py, watermark.timestamp), - watermark.message_time, + watermark.last_message_time, ) .unwrap() .into_py_any(py) @@ -454,7 +454,7 @@ impl From<&WatermarkMessage> for Py { WatermarkMessage::PyWatermark(watermark) => PyWatermark::new( watermark.committable.clone_ref(py), watermark.timestamp.clone_ref(py), - watermark.message_time, + watermark.last_message_time, ) .unwrap() .into_py_any(py) @@ -532,13 +532,11 @@ impl TryFrom> for WatermarkMessage { Err(e) => return Err(pyo3::exceptions::PyTypeError::new_err(e.to_string())), }; - let message_time = py_watermark.borrow().message_time; + let last_message_time = py_watermark.borrow().last_message_time; - Ok(WatermarkMessage::Watermark(Watermark::with_message_time( - committable, - timestamp, - message_time, - ))) + Ok(WatermarkMessage::Watermark( + Watermark::with_last_message_time(committable, timestamp, last_message_time), + )) } else { Err(pyo3::exceptions::PyTypeError::new_err(format!( "Message type is invalid: expected PyWatermark, got {}", diff --git a/sentry_streams/src/sinks.rs b/sentry_streams/src/sinks.rs index d16f0144..6832712a 100644 --- a/sentry_streams/src/sinks.rs +++ b/sentry_streams/src/sinks.rs @@ -42,8 +42,8 @@ use std::time::Duration; struct SerializableWatermark { committable: HashMap>>, timestamp: u64, - #[serde(default)] - message_time: Option, + #[serde(default, alias = "message_time")] + last_message_time: Option, } impl From for SerializableWatermark { @@ -68,7 +68,7 @@ impl From for SerializableWatermark { SerializableWatermark { committable, timestamp: value.timestamp, - message_time: value.message_time, + last_message_time: value.last_message_time, } } } @@ -87,7 +87,7 @@ impl From for Watermark { Watermark { committable, timestamp: value.timestamp, - message_time: value.message_time, + last_message_time: value.last_message_time, } } } @@ -305,6 +305,28 @@ mod tests { assert_eq!(watermark, converted_watermark) } + #[test] + fn deserializes_legacy_message_time_json_field() { + let json = br#"{"committable":{},"timestamp":0,"message_time":42.5}"#; + let sw: SerializableWatermark = serde_json::from_slice(json).unwrap(); + assert_eq!(sw.last_message_time, Some(42.5)); + } + + #[test] + fn serializes_last_message_time_json_field() { + let w = Watermark::with_last_message_time(BTreeMap::new(), 1, Some(3.0)); + let sw: SerializableWatermark = w.into(); + let s = serde_json::to_string(&sw).unwrap(); + assert!( + s.contains("last_message_time"), + "expected new key in JSON: {s}" + ); + assert!( + !s.contains("\"message_time\""), + "legacy key should not appear as its own field: {s}" + ); + } + #[test] fn test_kafka_payload() { crate::testutils::initialize_python(); diff --git a/sentry_streams/src/watermark.rs b/sentry_streams/src/watermark.rs index 0a6ef1b6..7c6c388e 100644 --- a/sentry_streams/src/watermark.rs +++ b/sentry_streams/src/watermark.rs @@ -53,13 +53,13 @@ impl WatermarkEmitter { fn send_watermark_msg(&mut self) -> Result<(), InvalidMessage> { let timestamp = current_epoch(); - let message_time = self.last_data_message_time; + let last_message_time = self.last_data_message_time; let watermark_msg = RoutedValue { route: self.route.clone(), payload: RoutedValuePayload::make_watermark_payload( self.watermark_committable.clone(), timestamp, - message_time, + last_message_time, ), }; let result = self.next_step.submit(Message::new_any_message( @@ -199,7 +199,7 @@ mod tests { set_timestamp(20); let _ = watermark.poll(); assert_watermarks_match( - vec![Watermark::with_message_time( + vec![Watermark::with_last_message_time( expected_committable, 20, Some(0.0), diff --git a/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py b/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py index cc88389a..7554d936 100644 --- a/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py +++ b/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py @@ -108,13 +108,13 @@ def build_py_msg( def build_watermark( committable: Committable, timestamp: int, - message_time: float | None = None, + last_message_time: float | None = None, ) -> Tuple[PipelineMessage, Committable]: return ( PyWatermark( committable, timestamp, - message_time, + last_message_time, ), committable, ) From 19c6aea20ac06d746bd04348156e5268b833966b Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 8 May 2026 11:50:31 -0700 Subject: [PATCH 6/6] Fix comment --- sentry_streams/src/batch_step.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 14322640..2f3fe952 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -132,8 +132,6 @@ impl Batch { self.batch_offsets.clone() } - /// Minimum of per-row logical timestamps (epoch seconds, sub-second precision), for synthetic - /// watermarks. /// Minimum of per-row logical timestamps (epoch seconds, sub-second precision) in this batch; /// used as `last_message_time` on synthetic watermarks after flush. pub fn oldest_batch_row_timestamp(&self) -> Option {