feat(streams): Consumer watermark commit latency metric#314
Conversation
Optional message_time on watermarks captures the last data message time at the pipeline head and the oldest row time for batch synthetic watermarks. WatermarkCommitOffsets records histogram streams.pipeline.consumer.watermark_commit_latency on commit (seconds since message_time, or 0 when unset). Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Address review on PR #314: align watermark message_time with the codebase convention of f64 message timestamps so we no longer floor sub-second precision when threading the value through native code, serialization, and Python. - Watermark, SerializableWatermark, PyWatermark, make_watermark_payload, Watermark::with_message_time and the Py<->Rust conversions now carry message_time as Option<f64> (Python float). - WatermarkEmitter and Batch read PyAnyMessage/RawMessage timestamps directly as f64; the streaming_payload_timestamp_floor_secs and data_message_time_secs_from_routed helpers are removed. - WatermarkCommitOffsets latency math becomes (current_epoch as f64) - oldest_message_time, clamped at 0 to absorb small clock skew. - rust_streams.pyi and the build_watermark test helper expose message_time as float | None. Drop the build_py_metrics_config docstring paragraph documenting the new metric (per review). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Co-authored-by: Cursor <cursoragent@cursor.com>
Clarifies that the field refers to the last (or oldest batched) data row timestamp, not the watermark's own timestamp. - Watermark, PyWatermark, WatermarkTracker, and make_watermark_payload use last_message_time; Watermark::with_last_message_time replaces with_message_time. - SerializableWatermark serializes as last_message_time with serde alias message_time for older JSON payloads. - Batch helper renamed to oldest_batch_row_timestamp. Refs #314 (comment) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 90b6fe0. Configure here.
| WatermarkTracker { | ||
| num_watermarks: tracker.num_watermarks + 1, | ||
| committable: tracker.committable.clone(), | ||
| time_added: tracker.time_added.clone(), | ||
| time_added: tracker.time_added, | ||
| last_message_time: tracker.last_message_time, | ||
| }, | ||
| ); | ||
| } |
There was a problem hiding this comment.
Bug: When two watermarks have the same timestamp, the last_message_time from the second one is discarded, potentially leading to inaccurate latency metrics.
Severity: MEDIUM
Suggested Fix
When a watermark arrives with a timestamp that is already being tracked, update the tracker's last_message_time to be the minimum of the existing value and the incoming watermark's last_message_time. This ensures the oldest message time is always tracked, fulfilling the intended behavior.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: sentry_streams/src/commit_policy.rs#L118-L125
Potential issue: In a multi-branch pipeline, if two branches generate synthetic
watermarks within the same second, they will have identical timestamps but potentially
different `last_message_time` values. The `WatermarkTracker`'s `submit()` function only
keeps the `last_message_time` from the first watermark to arrive with that timestamp,
silently discarding the value from the second. This can lead to underreporting consumer
lag, as the oldest `last_message_time` across all branches might be ignored. This is
particularly relevant for router/filter pipelines with `BatchStep`s processing different
data subsets.

Adds a latency metrics to the consumer.
As we rely on watermarks for commit we can rely on watermarks for latency making the
product oblivious to the generation of the latency metric.
Specifically:
Made with Cursor