Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sentry_streams/sentry_streams/rust_streams.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,11 @@ class PyWatermark:
self,
payload: dict[tuple[str, int], int],
timestamp: int,
last_message_time: float | None = None,
) -> None: ...
@property
def committable(self) -> dict[tuple[str, int], int]: ...
@property
def timestamp(self) -> int: ...
@property
def last_message_time(self) -> float | None: ...
67 changes: 63 additions & 4 deletions sentry_streams/src/batch_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,29 @@ impl Batch {
self.batch_offsets.clone()
}

/// Minimum of per-row logical timestamps (epoch seconds, sub-second precision) in this batch;
/// used as `last_message_time` on synthetic watermarks after flush.
Comment thread
cursor[bot] marked this conversation as resolved.
pub fn oldest_batch_row_timestamp(&self) -> Option<f64> {
if self.elements.is_empty() {
return None;
}
traced_with_gil!(|py| {
let mut min_t: Option<f64> = None;
for el in &self.elements {
let t = match el {
PyStreamingMessage::PyAnyMessage { content } => {
content.bind(py).borrow().timestamp
}
PyStreamingMessage::RawMessage { content } => {
content.bind(py).borrow().timestamp
}
};
min_t = Some(min_t.map_or(t, |m| m.min(t)));
}
min_t
})
}

pub fn flush(&self) -> Result<Message<RoutedValue>, StrategyError> {
let route = self.route.clone();
let committable = self.batch_offsets.clone();
Expand Down Expand Up @@ -264,6 +287,7 @@ impl BatchStep {
// We create a synthetic watermark to avoid waiting for the next batch to complete before
// allowing the consumer to commit.
let committable_for_synthetic = b.current_offsets_snapshot();
let batch_last_message_time = b.oldest_batch_row_timestamp();
let batch_elements = b.len() as f64;
let batch_open_ms = b.created_at.elapsed().as_millis() as f64;
let flush_start = Instant::now();
Expand All @@ -277,14 +301,19 @@ impl BatchStep {

self.outbound.push_back(batch_msg);
self.pending_batch = true;
self.enqueue_watermark_tail(wm_after_batch, committable_for_synthetic);
self.enqueue_watermark_tail(
wm_after_batch,
committable_for_synthetic,
batch_last_message_time,
);
Ok(())
}

fn enqueue_watermark_tail(
&mut self,
wm_after_batch: Vec<Message<RoutedValue>>,
committable: BTreeMap<Partition, u64>,
last_message_time: Option<f64>,
) {
for m in wm_after_batch {
self.outbound.push_back(m);
Expand All @@ -293,7 +322,11 @@ impl BatchStep {
let wmk = Message::new_any_message(
RoutedValue {
route: self.route.clone(),
payload: RoutedValuePayload::make_watermark_payload(committable.clone(), ts),
payload: RoutedValuePayload::make_watermark_payload(
committable.clone(),
ts,
last_message_time,
),
},
committable,
);
Expand Down Expand Up @@ -595,7 +628,9 @@ mod tests {

use super::super::{BatchStep, Message};
use crate::fake_strategy::FakeStrategy;
use crate::testutils::{build_raw_routed_value, build_routed_value};
use crate::testutils::{
build_raw_routed_value, build_routed_value, build_routed_value_with_timestamp,
};
use crate::utils::traced_with_gil;
use chrono::Utc;
use pyo3::prelude::*;
Expand Down Expand Up @@ -738,14 +773,38 @@ mod tests {
let (mut step, _out, wms) = batch_step_with_fake(route.clone(), None, None);
let rv = crate::routes::RoutedValue {
route,
payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0),
payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None),
};
let m = Message::new_any_message(rv, BTreeMap::new());
step.submit(m)
.expect("watermark should go to next step when no open batch");
assert_eq!(wms.lock().unwrap().len(), 1);
}

#[test]
fn synthetic_watermark_uses_oldest_row_timestamp() {
let route = Route::new("s".into(), vec!["w".into()]);
let (mut step, _out, wms) = batch_step_with_fake(route, Some(2), None);
traced_with_gil!(|py| {
let p1 = 1i32.into_pyobject(py).unwrap().into_any().unbind();
let p2 = 2i32.into_pyobject(py).unwrap().into_any().unbind();
let m1 = Message::new_any_message(
build_routed_value_with_timestamp(py, p1, "s", vec!["w".into()], 5000.25),
BTreeMap::new(),
);
let m2 = Message::new_any_message(
build_routed_value_with_timestamp(py, p2, "s", vec!["w".into()], 9000.9),
BTreeMap::new(),
);
step.submit(m1).unwrap();
step.submit(m2).unwrap();
step.poll().unwrap();
});
let w = wms.lock().unwrap();
assert_eq!(w.len(), 1);
assert_eq!(w[0].last_message_time, Some(5000.25));
}

#[test]
fn submit_rejects_while_batch_stalled_in_outbound() {
let route = Route::new("s".into(), vec!["w".into()]);
Expand Down
111 changes: 104 additions & 7 deletions sentry_streams/src/commit_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ use sentry_arroyo::processing::strategies::{
use sentry_arroyo::types::{Message, Partition};

use crate::messages::{RoutedValuePayload, WatermarkMessage};
#[cfg(test)]
use crate::mocks::current_epoch;
use crate::routes::RoutedValue;
#[cfg(not(test))]
use crate::time_helpers::current_epoch;

/// Histogram: seconds from watermark `last_message_time` (or 0 if absent) to commit decision.
const METRIC_WATERMARK_COMMIT_LATENCY: &str = "streams.pipeline.consumer.watermark_commit_latency";

/// Records the committable of a received Watermark and records how many times that watermark has been seen.
#[derive(Clone, Debug)]
struct WatermarkTracker {
num_watermarks: u64,
committable: HashMap<Partition, u64>,
time_added: Instant,
last_message_time: Option<f64>,
}

/// WatermarkCommitOffsets is a commit policy that only commits once it receives a copy of a Watermark
Expand Down Expand Up @@ -52,13 +60,25 @@ impl WatermarkCommitOffsets {
};
let mut to_remove = vec![];
let mut commit_request = empty_commit_request.clone();
// Track the oldest (minimum) last_message_time across all watermarks that contribute
// to the merged commit, so we record latency once per commit() based on the
// watermark furthest behind.
let mut oldest_last_message_time: Option<f64> = None;
for (ts, watermark) in self.watermarks.iter() {
if watermark.num_watermarks == self.num_branches {
let current_request = CommitRequest {
positions: watermark.committable.clone(),
};
commit_request =
merge_commit_request(Some(commit_request), Some(current_request)).unwrap();

if let Some(t) = watermark.last_message_time {
oldest_last_message_time = Some(match oldest_last_message_time {
Some(prev) => prev.min(t),
None => t,
});
}

Comment thread
fpacifici marked this conversation as resolved.
to_remove.push(ts.clone());
// Clean up any hanging watermarks which still haven't gotten all their copies in 5 min
// from when the first copy was seen
Expand All @@ -71,6 +91,10 @@ impl WatermarkCommitOffsets {
}

if commit_request != empty_commit_request {
let secs = oldest_last_message_time
.map(|t| ((current_epoch() as f64) - t).max(0.0))
.unwrap_or(0.0);
metrics::histogram!(METRIC_WATERMARK_COMMIT_LATENCY).record(secs);
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
fpacifici marked this conversation as resolved.
Some(commit_request)
} else {
None
Expand All @@ -94,7 +118,8 @@ impl ProcessingStrategy<RoutedValue> for WatermarkCommitOffsets {
WatermarkTracker {
num_watermarks: tracker.num_watermarks + 1,
committable: tracker.committable.clone(),
time_added: tracker.time_added.clone(),
time_added: tracker.time_added,
last_message_time: tracker.last_message_time,
},
);
}
Comment on lines 118 to 125
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: When two watermarks have the same timestamp, the last_message_time from the second one is discarded, potentially leading to inaccurate latency metrics.
Severity: MEDIUM

Suggested Fix

When a watermark arrives with a timestamp that is already being tracked, update the tracker's last_message_time to be the minimum of the existing value and the incoming watermark's last_message_time. This ensures the oldest message time is always tracked, fulfilling the intended behavior.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: sentry_streams/src/commit_policy.rs#L118-L125

Potential issue: In a multi-branch pipeline, if two branches generate synthetic
watermarks within the same second, they will have identical timestamps but potentially
different `last_message_time` values. The `WatermarkTracker`'s `submit()` function only
keeps the `last_message_time` from the first watermark to arrive with that timestamp,
silently discarding the value from the second. This can lead to underreporting consumer
lag, as the oldest `last_message_time` across all branches might be ignored. This is
particularly relevant for router/filter pipelines with `BatchStep`s processing different
data subsets.

Expand All @@ -109,6 +134,7 @@ impl ProcessingStrategy<RoutedValue> for WatermarkCommitOffsets {
num_watermarks: 1,
committable: committable,
time_added: Instant::now(),
last_message_time: watermark.last_message_time,
},
);
}
Expand All @@ -129,15 +155,60 @@ impl ProcessingStrategy<RoutedValue> for WatermarkCommitOffsets {

#[cfg(test)]
mod tests {
use crate::{messages::Watermark, routes::Route, testutils::make_committable};
use crate::messages::Watermark;
use crate::mocks::set_timestamp;
use crate::{routes::Route, testutils::make_committable};

use metrics::{Key, KeyName, Metadata, Recorder, SharedString, Unit};
use std::sync::{Arc, Mutex};

use super::*;

/// Minimal histogram-only recorder modeled on `pipeline_stats::tests::CaptureRecorder`.
#[derive(Default)]
struct CaptureRecorder {
histograms: Arc<Mutex<Vec<(Key, f64)>>>,
}

impl Recorder for CaptureRecorder {
fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
fn register_counter(&self, _: &Key, _: &Metadata<'_>) -> metrics::Counter {
metrics::Counter::noop()
}
fn register_gauge(&self, _: &Key, _: &Metadata<'_>) -> metrics::Gauge {
metrics::Gauge::noop()
}
fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> metrics::Histogram {
metrics::Histogram::from_arc(Arc::new(CaptureHistogram {
key: key.clone(),
histograms: Arc::clone(&self.histograms),
}))
}
}

struct CaptureHistogram {
key: Key,
histograms: Arc<Mutex<Vec<(Key, f64)>>>,
}

impl metrics::HistogramFn for CaptureHistogram {
fn record(&self, value: f64) {
self.histograms
.lock()
.unwrap()
.push((self.key.clone(), value));
}
}

#[test]
fn test_commit_offsets() {
// Pin current_epoch so the latency metric is deterministic.
set_timestamp(100);
let mut commit_step = WatermarkCommitOffsets::new(2);

let watermark = Watermark::new(make_committable(3, 0), 0);
let watermark = Watermark::with_last_message_time(make_committable(3, 0), 0, Some(80.0));
let mut messages = vec![];
for waypoint in ["route1", "route2"] {
messages.push(Message::new_any_message(
Expand Down Expand Up @@ -165,11 +236,37 @@ mod tests {
);
}

// Second watermark actually returns CommitRequest on poll()
// Second watermark actually returns CommitRequest on poll() and records the latency
// metric once based on the (only) tracker's last_message_time.
let _ = commit_step.submit(messages[1].clone());
assert_eq!(commit_step.watermarks[&ts].num_watermarks, 2);
if let Ok(None) = commit_step.poll() {
panic!("Commit step returned didn't return CommitRequest with 2 watermarks");
}

let histograms = Arc::new(Mutex::new(Vec::<(Key, f64)>::new()));
let recorder = CaptureRecorder {
histograms: Arc::clone(&histograms),
};
let result = {
let _guard = metrics::set_default_local_recorder(&recorder);
commit_step.poll()
};
assert!(
matches!(result, Ok(Some(_))),
"Commit step returned didn't return CommitRequest with 2 watermarks"
);

let recorded: Vec<f64> = histograms
.lock()
.unwrap()
.iter()
.filter(|(k, _)| k.name() == METRIC_WATERMARK_COMMIT_LATENCY)
.map(|(_, v)| *v)
.collect();
assert_eq!(
recorded,
vec![20.0],
"expected exactly one latency sample of (current_epoch - last_message_time)"
);

set_timestamp(0);
}
}
6 changes: 5 additions & 1 deletion sentry_streams/src/dev_null_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,11 @@ mod tests {
let watermark_msg = sentry_arroyo::types::Message::new_any_message(
crate::routes::RoutedValue {
route: Route::new("source".to_string(), vec!["wp1".to_string()]),
payload: RoutedValuePayload::make_watermark_payload(watermark_payload.clone(), 0),
payload: RoutedValuePayload::make_watermark_payload(
watermark_payload.clone(),
0,
None,
),
},
BTreeMap::new(),
);
Expand Down
2 changes: 2 additions & 0 deletions sentry_streams/src/fake_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,7 @@ pub fn assert_watermarks_match(expected_messages: Vec<Watermark>, actual_message

for (actual, expected) in actual_messages.iter().zip(expected_messages.iter()) {
assert_eq!(actual.committable, expected.committable);
assert_eq!(actual.timestamp, expected.timestamp);
assert_eq!(actual.last_message_time, expected.last_message_time);
}
}
2 changes: 1 addition & 1 deletion sentry_streams/src/filter_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ mod tests {

let watermark_val = RoutedValue {
route: Route::new(String::from("source"), vec![]),
payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0),
payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None),
};
let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new());
let watermark_res = strategy.submit(watermark_msg);
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/src/header_filter_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ mod tests {

let watermark_val = RoutedValue {
route: Route::new(String::from("source"), vec![]),
payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0),
payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0, None),
};
let watermark_msg = Message::new_any_message(watermark_val, BTreeMap::new());
assert!(strategy.submit(watermark_msg).is_ok());
Expand Down
Loading
Loading