From acb1d00e9d1cdc8388f6d62280930cf971ad964c Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 6 May 2026 17:38:00 -0700 Subject: [PATCH 1/2] feat(streams): Add batch flush and GCS write metrics Record histograms for batch element count and open duration on flush, and a gauge for successful GCS object size keyed by route source. Co-authored-by: Cursor --- sentry_streams/src/batch_step.rs | 13 +++++++++++++ sentry_streams/src/gcs_writer.rs | 5 +++++ 2 files changed, 18 insertions(+) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index c2a4d3a8..75ee3715 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -21,6 +21,10 @@ use sentry_arroyo::utils::timing::Deadline; use std::collections::{BTreeMap, VecDeque}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +const METRIC_BATCH_FLUSH_ELEMENTS: &str = "streams.pipeline.batch.flush.elements"; +const METRIC_BATCH_FLUSH_OPEN_DURATION_SECS: &str = + "streams.pipeline.batch.flush.open_duration_secs"; + fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option { match first { PyStreamingMessage::PyAnyMessage { content } => content.bind(py).borrow().schema.clone(), @@ -54,6 +58,8 @@ pub(crate) struct Batch { max_batch_size: Option, /// Set when the window is time-bounded; elapsed means flush by time. batch_deadline: Option, + /// Wall time when the first element opened this batch window. + created_at: Instant, elements: Vec, batch_offsets: BTreeMap, } @@ -82,6 +88,7 @@ impl Batch { route, max_batch_size, batch_deadline, + created_at: Instant::now(), elements: vec![first], batch_offsets, } @@ -258,9 +265,15 @@ impl BatchStep { // We create a synthetic watermark to avoid waiting for the next batch to complete before // allowing the consumer to commit. let committable_for_synthetic = b.current_offsets_snapshot(); + let batch_elements = b.len() as f64; + let batch_open_secs = b.created_at.elapsed().as_secs_f64(); let flush_start = Instant::now(); let batch_msg = b.flush()?; get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64()); + let step_labels = vec![("step".to_string(), self.step_name.clone())]; + metrics::histogram!(METRIC_BATCH_FLUSH_ELEMENTS, &step_labels).record(batch_elements); + metrics::histogram!(METRIC_BATCH_FLUSH_OPEN_DURATION_SECS, &step_labels) + .record(batch_open_secs); self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); diff --git a/sentry_streams/src/gcs_writer.rs b/sentry_streams/src/gcs_writer.rs index e6782797..419ee525 100644 --- a/sentry_streams/src/gcs_writer.rs +++ b/sentry_streams/src/gcs_writer.rs @@ -21,6 +21,8 @@ use gcp_auth::{provider, TokenProvider}; use std::sync::Arc; use tokio::sync::OnceCell; +const METRIC_GCS_WRITE_BYTES: &str = "streams.pipeline.gcs.write.bytes"; + pub struct GCSWriter { client: Client, bucket: String, @@ -102,6 +104,7 @@ impl TaskRunner for GCSWriter { let pybytes_ms = pybytes_start.elapsed().as_millis(); let bytes_len = bytes.len(); + let route_source = self.route.source.clone(); let auth_provider_cell = self.auth_provider.clone(); @@ -176,6 +179,8 @@ impl TaskRunner for GCSWriter { token_ms, request_ms ); + let gcs_labels = vec![("source".to_string(), route_source.clone())]; + metrics::gauge!(METRIC_GCS_WRITE_BYTES, &gcs_labels).set(bytes_len as f64); Ok(message) } }) From 6e8d3243d0078051cd2d5044c9ab574a6c3b2945 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 7 May 2026 14:38:30 -0700 Subject: [PATCH 2/2] ref(streams): Align pipeline metric names and types with review - Rename batch size/time metrics; record batch age in milliseconds - Use histogram for GCS sink bytes; rename to sink.gcs_writer.bytes Co-authored-by: Cursor --- sentry_streams/src/batch_step.rs | 12 +++++------- sentry_streams/src/gcs_writer.rs | 5 +++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 75ee3715..e84a003c 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -21,9 +21,8 @@ use sentry_arroyo::utils::timing::Deadline; use std::collections::{BTreeMap, VecDeque}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -const METRIC_BATCH_FLUSH_ELEMENTS: &str = "streams.pipeline.batch.flush.elements"; -const METRIC_BATCH_FLUSH_OPEN_DURATION_SECS: &str = - "streams.pipeline.batch.flush.open_duration_secs"; +const METRIC_BATCH_SIZE: &str = "streams.pipeline.batch.size"; +const METRIC_BATCH_TIME_MS: &str = "streams.pipeline.batch.time_ms"; fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option { match first { @@ -266,14 +265,13 @@ impl BatchStep { // allowing the consumer to commit. let committable_for_synthetic = b.current_offsets_snapshot(); let batch_elements = b.len() as f64; - let batch_open_secs = b.created_at.elapsed().as_secs_f64(); + let batch_open_ms = b.created_at.elapsed().as_millis() as f64; let flush_start = Instant::now(); let batch_msg = b.flush()?; get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64()); let step_labels = vec![("step".to_string(), self.step_name.clone())]; - metrics::histogram!(METRIC_BATCH_FLUSH_ELEMENTS, &step_labels).record(batch_elements); - metrics::histogram!(METRIC_BATCH_FLUSH_OPEN_DURATION_SECS, &step_labels) - .record(batch_open_secs); + metrics::histogram!(METRIC_BATCH_SIZE, &step_labels).record(batch_elements); + metrics::histogram!(METRIC_BATCH_TIME_MS, &step_labels).record(batch_open_ms); self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); diff --git a/sentry_streams/src/gcs_writer.rs b/sentry_streams/src/gcs_writer.rs index 419ee525..94e00f8f 100644 --- a/sentry_streams/src/gcs_writer.rs +++ b/sentry_streams/src/gcs_writer.rs @@ -21,7 +21,7 @@ use gcp_auth::{provider, TokenProvider}; use std::sync::Arc; use tokio::sync::OnceCell; -const METRIC_GCS_WRITE_BYTES: &str = "streams.pipeline.gcs.write.bytes"; +const METRIC_SINK_GCS_WRITER_BYTES: &str = "streams.pipeline.sink.gcs_writer.bytes"; pub struct GCSWriter { client: Client, @@ -180,7 +180,8 @@ impl TaskRunner for GCSWriter { request_ms ); let gcs_labels = vec![("source".to_string(), route_source.clone())]; - metrics::gauge!(METRIC_GCS_WRITE_BYTES, &gcs_labels).set(bytes_len as f64); + metrics::histogram!(METRIC_SINK_GCS_WRITER_BYTES, &gcs_labels) + .record(bytes_len as f64); Ok(message) } })