diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 84257e98..0ac038f1 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -200,8 +200,11 @@ class PyWatermark: self, payload: dict[tuple[str, int], int], timestamp: int, + last_message_time: float | None = None, ) -> None: ... @property def committable(self) -> dict[tuple[str, int], int]: ... @property def timestamp(self) -> int: ... + @property + def last_message_time(self) -> float | None: ... diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index e84a003c..2f3fe952 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -132,6 +132,29 @@ impl Batch { self.batch_offsets.clone() } + /// Minimum of per-row logical timestamps (epoch seconds, sub-second precision) in this batch; + /// used as `last_message_time` on synthetic watermarks after flush. + pub fn oldest_batch_row_timestamp(&self) -> Option { + if self.elements.is_empty() { + return None; + } + traced_with_gil!(|py| { + let mut min_t: Option = None; + for el in &self.elements { + let t = match el { + PyStreamingMessage::PyAnyMessage { content } => { + content.bind(py).borrow().timestamp + } + PyStreamingMessage::RawMessage { content } => { + content.bind(py).borrow().timestamp + } + }; + min_t = Some(min_t.map_or(t, |m| m.min(t))); + } + min_t + }) + } + pub fn flush(&self) -> Result, StrategyError> { let route = self.route.clone(); let committable = self.batch_offsets.clone(); @@ -264,6 +287,7 @@ impl BatchStep { // We create a synthetic watermark to avoid waiting for the next batch to complete before // allowing the consumer to commit. let committable_for_synthetic = b.current_offsets_snapshot(); + let batch_last_message_time = b.oldest_batch_row_timestamp(); let batch_elements = b.len() as f64; let batch_open_ms = b.created_at.elapsed().as_millis() as f64; let flush_start = Instant::now(); @@ -277,7 +301,11 @@ impl BatchStep { self.outbound.push_back(batch_msg); self.pending_batch = true; - self.enqueue_watermark_tail(wm_after_batch, committable_for_synthetic); + self.enqueue_watermark_tail( + wm_after_batch, + committable_for_synthetic, + batch_last_message_time, + ); Ok(()) } @@ -285,6 +313,7 @@ impl BatchStep { &mut self, wm_after_batch: Vec>, committable: BTreeMap, + last_message_time: Option, ) { for m in wm_after_batch { self.outbound.push_back(m); @@ -293,7 +322,11 @@ impl BatchStep { let wmk = Message::new_any_message( RoutedValue { route: self.route.clone(), - payload: RoutedValuePayload::make_watermark_payload(committable.clone(), ts), + payload: RoutedValuePayload::make_watermark_payload( + committable.clone(), + ts, + last_message_time, + ), }, committable, ); @@ -595,7 +628,9 @@ mod tests { use super::super::{BatchStep, Message}; use crate::fake_strategy::FakeStrategy; - use crate::testutils::{build_raw_routed_value, build_routed_value}; + use crate::testutils::{ + build_raw_routed_value, build_routed_value, build_routed_value_with_timestamp, + }; use crate::utils::traced_with_gil; use chrono::Utc; use pyo3::prelude::*; @@ -738,7 +773,7 @@ mod tests { let (mut step, _out, wms) = batch_step_with_fake(route.clone(), None, None); let rv = crate::routes::RoutedValue { route, - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let m = Message::new_any_message(rv, BTreeMap::new()); step.submit(m) @@ -746,6 +781,30 @@ mod tests { assert_eq!(wms.lock().unwrap().len(), 1); } + #[test] + fn synthetic_watermark_uses_oldest_row_timestamp() { + let route = Route::new("s".into(), vec!["w".into()]); + let (mut step, _out, wms) = batch_step_with_fake(route, Some(2), None); + traced_with_gil!(|py| { + let p1 = 1i32.into_pyobject(py).unwrap().into_any().unbind(); + let p2 = 2i32.into_pyobject(py).unwrap().into_any().unbind(); + let m1 = Message::new_any_message( + build_routed_value_with_timestamp(py, p1, "s", vec!["w".into()], 5000.25), + BTreeMap::new(), + ); + let m2 = Message::new_any_message( + build_routed_value_with_timestamp(py, p2, "s", vec!["w".into()], 9000.9), + BTreeMap::new(), + ); + step.submit(m1).unwrap(); + step.submit(m2).unwrap(); + step.poll().unwrap(); + }); + let w = wms.lock().unwrap(); + assert_eq!(w.len(), 1); + assert_eq!(w[0].last_message_time, Some(5000.25)); + } + #[test] fn submit_rejects_while_batch_stalled_in_outbound() { let route = Route::new("s".into(), vec!["w".into()]); diff --git a/sentry_streams/src/commit_policy.rs b/sentry_streams/src/commit_policy.rs index 3ecca9f0..32937d1e 100644 --- a/sentry_streams/src/commit_policy.rs +++ b/sentry_streams/src/commit_policy.rs @@ -7,7 +7,14 @@ use sentry_arroyo::processing::strategies::{ use sentry_arroyo::types::{Message, Partition}; use crate::messages::{RoutedValuePayload, WatermarkMessage}; +#[cfg(test)] +use crate::mocks::current_epoch; use crate::routes::RoutedValue; +#[cfg(not(test))] +use crate::time_helpers::current_epoch; + +/// Histogram: seconds from watermark `last_message_time` (or 0 if absent) to commit decision. +const METRIC_WATERMARK_COMMIT_LATENCY: &str = "streams.pipeline.consumer.watermark_commit_latency"; /// Records the committable of a received Watermark and records how many times that watermark has been seen. #[derive(Clone, Debug)] @@ -15,6 +22,7 @@ struct WatermarkTracker { num_watermarks: u64, committable: HashMap, time_added: Instant, + last_message_time: Option, } /// WatermarkCommitOffsets is a commit policy that only commits once it receives a copy of a Watermark @@ -52,6 +60,10 @@ impl WatermarkCommitOffsets { }; let mut to_remove = vec![]; let mut commit_request = empty_commit_request.clone(); + // Track the oldest (minimum) last_message_time across all watermarks that contribute + // to the merged commit, so we record latency once per commit() based on the + // watermark furthest behind. + let mut oldest_last_message_time: Option = None; for (ts, watermark) in self.watermarks.iter() { if watermark.num_watermarks == self.num_branches { let current_request = CommitRequest { @@ -59,6 +71,14 @@ impl WatermarkCommitOffsets { }; commit_request = merge_commit_request(Some(commit_request), Some(current_request)).unwrap(); + + if let Some(t) = watermark.last_message_time { + oldest_last_message_time = Some(match oldest_last_message_time { + Some(prev) => prev.min(t), + None => t, + }); + } + to_remove.push(ts.clone()); // Clean up any hanging watermarks which still haven't gotten all their copies in 5 min // from when the first copy was seen @@ -71,6 +91,10 @@ impl WatermarkCommitOffsets { } if commit_request != empty_commit_request { + let secs = oldest_last_message_time + .map(|t| ((current_epoch() as f64) - t).max(0.0)) + .unwrap_or(0.0); + metrics::histogram!(METRIC_WATERMARK_COMMIT_LATENCY).record(secs); Some(commit_request) } else { None @@ -94,7 +118,8 @@ impl ProcessingStrategy for WatermarkCommitOffsets { WatermarkTracker { num_watermarks: tracker.num_watermarks + 1, committable: tracker.committable.clone(), - time_added: tracker.time_added.clone(), + time_added: tracker.time_added, + last_message_time: tracker.last_message_time, }, ); } @@ -109,6 +134,7 @@ impl ProcessingStrategy for WatermarkCommitOffsets { num_watermarks: 1, committable: committable, time_added: Instant::now(), + last_message_time: watermark.last_message_time, }, ); } @@ -129,15 +155,60 @@ impl ProcessingStrategy for WatermarkCommitOffsets { #[cfg(test)] mod tests { - use crate::{messages::Watermark, routes::Route, testutils::make_committable}; + use crate::messages::Watermark; + use crate::mocks::set_timestamp; + use crate::{routes::Route, testutils::make_committable}; + + use metrics::{Key, KeyName, Metadata, Recorder, SharedString, Unit}; + use std::sync::{Arc, Mutex}; use super::*; + /// Minimal histogram-only recorder modeled on `pipeline_stats::tests::CaptureRecorder`. + #[derive(Default)] + struct CaptureRecorder { + histograms: Arc>>, + } + + impl Recorder for CaptureRecorder { + fn describe_counter(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_gauge(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_histogram(&self, _: KeyName, _: Option, _: SharedString) {} + fn register_counter(&self, _: &Key, _: &Metadata<'_>) -> metrics::Counter { + metrics::Counter::noop() + } + fn register_gauge(&self, _: &Key, _: &Metadata<'_>) -> metrics::Gauge { + metrics::Gauge::noop() + } + fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> metrics::Histogram { + metrics::Histogram::from_arc(Arc::new(CaptureHistogram { + key: key.clone(), + histograms: Arc::clone(&self.histograms), + })) + } + } + + struct CaptureHistogram { + key: Key, + histograms: Arc>>, + } + + impl metrics::HistogramFn for CaptureHistogram { + fn record(&self, value: f64) { + self.histograms + .lock() + .unwrap() + .push((self.key.clone(), value)); + } + } + #[test] fn test_commit_offsets() { + // Pin current_epoch so the latency metric is deterministic. + set_timestamp(100); let mut commit_step = WatermarkCommitOffsets::new(2); - let watermark = Watermark::new(make_committable(3, 0), 0); + let watermark = Watermark::with_last_message_time(make_committable(3, 0), 0, Some(80.0)); let mut messages = vec![]; for waypoint in ["route1", "route2"] { messages.push(Message::new_any_message( @@ -165,11 +236,37 @@ mod tests { ); } - // Second watermark actually returns CommitRequest on poll() + // Second watermark actually returns CommitRequest on poll() and records the latency + // metric once based on the (only) tracker's last_message_time. let _ = commit_step.submit(messages[1].clone()); assert_eq!(commit_step.watermarks[&ts].num_watermarks, 2); - if let Ok(None) = commit_step.poll() { - panic!("Commit step returned didn't return CommitRequest with 2 watermarks"); - } + + let histograms = Arc::new(Mutex::new(Vec::<(Key, f64)>::new())); + let recorder = CaptureRecorder { + histograms: Arc::clone(&histograms), + }; + let result = { + let _guard = metrics::set_default_local_recorder(&recorder); + commit_step.poll() + }; + assert!( + matches!(result, Ok(Some(_))), + "Commit step returned didn't return CommitRequest with 2 watermarks" + ); + + let recorded: Vec = histograms + .lock() + .unwrap() + .iter() + .filter(|(k, _)| k.name() == METRIC_WATERMARK_COMMIT_LATENCY) + .map(|(_, v)| *v) + .collect(); + assert_eq!( + recorded, + vec![20.0], + "expected exactly one latency sample of (current_epoch - last_message_time)" + ); + + set_timestamp(0); } } diff --git a/sentry_streams/src/dev_null_sink.rs b/sentry_streams/src/dev_null_sink.rs index 85cfcaca..851437e1 100644 --- a/sentry_streams/src/dev_null_sink.rs +++ b/sentry_streams/src/dev_null_sink.rs @@ -234,7 +234,11 @@ mod tests { let watermark_msg = sentry_arroyo::types::Message::new_any_message( crate::routes::RoutedValue { route: Route::new("source".to_string(), vec!["wp1".to_string()]), - payload: RoutedValuePayload::make_watermark_payload(watermark_payload.clone(), 0), + payload: RoutedValuePayload::make_watermark_payload( + watermark_payload.clone(), + 0, + None, + ), }, BTreeMap::new(), ); diff --git a/sentry_streams/src/fake_strategy.rs b/sentry_streams/src/fake_strategy.rs index 9ce28008..b893289a 100644 --- a/sentry_streams/src/fake_strategy.rs +++ b/sentry_streams/src/fake_strategy.rs @@ -138,5 +138,7 @@ pub fn assert_watermarks_match(expected_messages: Vec, actual_message for (actual, expected) in actual_messages.iter().zip(expected_messages.iter()) { assert_eq!(actual.committable, expected.committable); + assert_eq!(actual.timestamp, expected.timestamp); + assert_eq!(actual.last_message_time, expected.last_message_time); } } diff --git a/sentry_streams/src/filter_step.rs b/sentry_streams/src/filter_step.rs index a0bf5598..0cfb15b9 100644 --- a/sentry_streams/src/filter_step.rs +++ b/sentry_streams/src/filter_step.rs @@ -294,7 +294,7 @@ mod tests { let watermark_val = RoutedValue { route: Route::new(String::from("source"), vec![]), - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new()); let watermark_res = strategy.submit(watermark_msg); diff --git a/sentry_streams/src/header_filter_step.rs b/sentry_streams/src/header_filter_step.rs index ebffcde7..25239118 100644 --- a/sentry_streams/src/header_filter_step.rs +++ b/sentry_streams/src/header_filter_step.rs @@ -390,7 +390,7 @@ mod tests { let watermark_val = RoutedValue { route: Route::new(String::from("source"), vec![]), - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new()); assert!(strategy.submit(watermark_msg).is_ok()); diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 4cf246b9..39075823 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -119,9 +119,11 @@ impl Clone for WatermarkMessage { traced_with_gil!(|py| { let committable = py_watermark.committable.clone_ref(py); let timestamp = py_watermark.timestamp.clone_ref(py); + let last_message_time = py_watermark.last_message_time; WatermarkMessage::PyWatermark(PyWatermark { committable, timestamp, + last_message_time, }) }) } @@ -134,6 +136,10 @@ impl Clone for WatermarkMessage { pub struct Watermark { pub committable: BTreeMap, pub timestamp: u64, + /// Unix epoch seconds (with sub-second precision) from the newest data message since the + /// previous watermark, or the oldest message in a batch for synthetic batch watermarks. + /// `None` if no such message. + pub last_message_time: Option, } impl Watermark { @@ -141,6 +147,19 @@ impl Watermark { Self { committable, timestamp, + last_message_time: None, + } + } + + pub fn with_last_message_time( + committable: BTreeMap, + timestamp: u64, + last_message_time: Option, + ) -> Self { + Self { + committable, + timestamp, + last_message_time, } } } @@ -154,15 +173,23 @@ pub struct PyWatermark { pub committable: Py, #[pyo3(get)] pub timestamp: Py, + #[pyo3(get)] + pub last_message_time: Option, } #[pymethods] impl PyWatermark { #[new] - pub fn new(committable: Py, timestamp: Py) -> PyResult { + #[pyo3(signature = (committable, timestamp, last_message_time=None))] + pub fn new( + committable: Py, + timestamp: Py, + last_message_time: Option, + ) -> PyResult { Ok(Self { committable, timestamp, + last_message_time, }) } } @@ -388,11 +415,14 @@ impl RoutedValuePayload { } } - pub fn make_watermark_payload(committable: BTreeMap, timestamp: u64) -> Self { - RoutedValuePayload::WatermarkMessage(WatermarkMessage::Watermark(Watermark::new( - committable, - timestamp, - ))) + pub fn make_watermark_payload( + committable: BTreeMap, + timestamp: u64, + last_message_time: Option, + ) -> Self { + RoutedValuePayload::WatermarkMessage(WatermarkMessage::Watermark( + Watermark::with_last_message_time(committable, timestamp, last_message_time), + )) } } @@ -416,6 +446,7 @@ impl From<&WatermarkMessage> for Py { WatermarkMessage::Watermark(watermark) => PyWatermark::new( convert_committable_to_py(py, watermark.committable.clone()).unwrap(), make_py_int(py, watermark.timestamp), + watermark.last_message_time, ) .unwrap() .into_py_any(py) @@ -423,6 +454,7 @@ impl From<&WatermarkMessage> for Py { WatermarkMessage::PyWatermark(watermark) => PyWatermark::new( watermark.committable.clone_ref(py), watermark.timestamp.clone_ref(py), + watermark.last_message_time, ) .unwrap() .into_py_any(py) @@ -500,10 +532,11 @@ impl TryFrom> for WatermarkMessage { Err(e) => return Err(pyo3::exceptions::PyTypeError::new_err(e.to_string())), }; - Ok(WatermarkMessage::Watermark(Watermark::new( - committable, - timestamp, - ))) + let last_message_time = py_watermark.borrow().last_message_time; + + Ok(WatermarkMessage::Watermark( + Watermark::with_last_message_time(committable, timestamp, last_message_time), + )) } else { Err(pyo3::exceptions::PyTypeError::new_err(format!( "Message type is invalid: expected PyWatermark, got {}", @@ -709,7 +742,8 @@ mod tests { // Create PyAnyMessage let msg = - PyWatermark::new(committable.unbind().clone_ref(py), make_py_int(py, 0)).unwrap(); + PyWatermark::new(committable.unbind().clone_ref(py), make_py_int(py, 0), None) + .unwrap(); // Check payload let payload_val: BTreeMap<(String, u64), u64> = @@ -723,7 +757,7 @@ mod tests { #[test] fn test_is_watermark_message() { - let wmsg = RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0); + let wmsg = RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None); assert!(wmsg.is_watermark_msg()); } @@ -756,7 +790,7 @@ mod tests { #[test] #[should_panic] fn test_unwrap_payload_watermark_msg() { - let wmsg = RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0); + let wmsg = RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None); wmsg.unwrap_payload(); } } diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index 1a62a620..d7d094f6 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -472,7 +472,7 @@ class RustOperatorDelegateFactory: let watermark_val = RoutedValue { route: Route::new(String::from("source"), vec![]), - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new()); let watermark_res = operator.submit(watermark_msg); diff --git a/sentry_streams/src/sinks.rs b/sentry_streams/src/sinks.rs index dd539087..6832712a 100644 --- a/sentry_streams/src/sinks.rs +++ b/sentry_streams/src/sinks.rs @@ -42,6 +42,8 @@ use std::time::Duration; struct SerializableWatermark { committable: HashMap>>, timestamp: u64, + #[serde(default, alias = "message_time")] + last_message_time: Option, } impl From for SerializableWatermark { @@ -66,6 +68,7 @@ impl From for SerializableWatermark { SerializableWatermark { committable, timestamp: value.timestamp, + last_message_time: value.last_message_time, } } } @@ -84,6 +87,7 @@ impl From for Watermark { Watermark { committable, timestamp: value.timestamp, + last_message_time: value.last_message_time, } } } @@ -301,6 +305,28 @@ mod tests { assert_eq!(watermark, converted_watermark) } + #[test] + fn deserializes_legacy_message_time_json_field() { + let json = br#"{"committable":{},"timestamp":0,"message_time":42.5}"#; + let sw: SerializableWatermark = serde_json::from_slice(json).unwrap(); + assert_eq!(sw.last_message_time, Some(42.5)); + } + + #[test] + fn serializes_last_message_time_json_field() { + let w = Watermark::with_last_message_time(BTreeMap::new(), 1, Some(3.0)); + let sw: SerializableWatermark = w.into(); + let s = serde_json::to_string(&sw).unwrap(); + assert!( + s.contains("last_message_time"), + "expected new key in JSON: {s}" + ); + assert!( + !s.contains("\"message_time\""), + "legacy key should not appear as its own field: {s}" + ); + } + #[test] fn test_kafka_payload() { crate::testutils::initialize_python(); @@ -373,7 +399,7 @@ mod tests { let watermark_val = RoutedValue { route: Route::new(String::from("source"), vec![]), - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new()); let watermark_res = sink.submit(watermark_msg); diff --git a/sentry_streams/src/testutils.rs b/sentry_streams/src/testutils.rs index 787abc2b..7f861025 100644 --- a/sentry_streams/src/testutils.rs +++ b/sentry_streams/src/testutils.rs @@ -51,6 +51,17 @@ pub fn build_routed_value( msg_payload: Py, source: &str, waypoints: Vec, +) -> RoutedValue { + build_routed_value_with_timestamp(py, msg_payload, source, waypoints, 0.0) +} + +#[cfg(test)] +pub fn build_routed_value_with_timestamp( + py: Python<'_>, + msg_payload: Py, + source: &str, + waypoints: Vec, + timestamp: f64, ) -> RoutedValue { let route = Route::new(source.to_string(), waypoints); let payload = PyStreamingMessage::PyAnyMessage { @@ -59,7 +70,7 @@ pub fn build_routed_value( PyAnyMessage { payload: msg_payload, headers: vec![], - timestamp: 0.0, + timestamp, schema: None, }, ) diff --git a/sentry_streams/src/transformer.rs b/sentry_streams/src/transformer.rs index 931b5353..6fe3d325 100644 --- a/sentry_streams/src/transformer.rs +++ b/sentry_streams/src/transformer.rs @@ -250,7 +250,7 @@ mod tests { let watermark_val = RoutedValue { route: Route::new(String::from("source"), vec![]), - payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None), }; let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new()); let watermark_res = strategy.submit(watermark_msg); diff --git a/sentry_streams/src/watermark.rs b/sentry_streams/src/watermark.rs index e9e0df12..7c6c388e 100644 --- a/sentry_streams/src/watermark.rs +++ b/sentry_streams/src/watermark.rs @@ -1,5 +1,6 @@ -use crate::messages::RoutedValuePayload; +use crate::messages::{PyStreamingMessage, RoutedValuePayload}; use crate::routes::{Route, RoutedValue}; +use crate::utils::traced_with_gil; use sentry_arroyo::processing::strategies::{ CommitRequest, InvalidMessage, ProcessingStrategy, StrategyError, SubmitError, }; @@ -25,6 +26,7 @@ pub struct WatermarkEmitter { pub period: u64, pub watermark_committable: BTreeMap, last_sent_timestamp: u64, + last_data_message_time: Option, } impl WatermarkEmitter { @@ -41,6 +43,7 @@ impl WatermarkEmitter { period, watermark_committable: empty_committable, last_sent_timestamp: current_timestamp, + last_data_message_time: None, } } @@ -50,11 +53,13 @@ impl WatermarkEmitter { fn send_watermark_msg(&mut self) -> Result<(), InvalidMessage> { let timestamp = current_epoch(); + let last_message_time = self.last_data_message_time; let watermark_msg = RoutedValue { route: self.route.clone(), payload: RoutedValuePayload::make_watermark_payload( self.watermark_committable.clone(), timestamp, + last_message_time, ), }; let result = self.next_step.submit(Message::new_any_message( @@ -65,6 +70,7 @@ impl WatermarkEmitter { Ok(..) => { self.last_sent_timestamp = timestamp; self.watermark_committable = BTreeMap::new(); + self.last_data_message_time = None; Ok(()) } Err(err) => match err { @@ -97,6 +103,13 @@ impl ProcessingStrategy for WatermarkEmitter { fn submit(&mut self, message: Message) -> Result<(), SubmitError> { self.merge_watermark_committable(&message); + if let RoutedValuePayload::PyStreamingMessage(ref sm) = &message.payload().payload { + let ts = traced_with_gil!(|py| match sm { + PyStreamingMessage::PyAnyMessage { content } => content.bind(py).borrow().timestamp, + PyStreamingMessage::RawMessage { content } => content.bind(py).borrow().timestamp, + }); + self.last_data_message_time = Some(ts); + } self.next_step.submit(message) } @@ -186,7 +199,11 @@ mod tests { set_timestamp(20); let _ = watermark.poll(); assert_watermarks_match( - vec![Watermark::new(expected_committable, 0)], + vec![Watermark::with_last_message_time( + expected_committable, + 20, + Some(0.0), + )], submitted_watermarks_clone.lock().unwrap().deref(), ); set_timestamp(0); diff --git a/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py b/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py index d0fd12f9..7554d936 100644 --- a/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py +++ b/sentry_streams/tests/adapters/arroyo/helpers/message_helpers.py @@ -106,12 +106,15 @@ def build_py_msg( def build_watermark( - committable: Committable, timestamp: int + committable: Committable, + timestamp: int, + last_message_time: float | None = None, ) -> Tuple[PipelineMessage, Committable]: return ( PyWatermark( committable, timestamp, + last_message_time, ), committable, )