diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index c2a4d3a8..e84a003c 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -21,6 +21,9 @@ use sentry_arroyo::utils::timing::Deadline; use std::collections::{BTreeMap, VecDeque}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +const METRIC_BATCH_SIZE: &str = "streams.pipeline.batch.size"; +const METRIC_BATCH_TIME_MS: &str = "streams.pipeline.batch.time_ms"; + fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option { match first { PyStreamingMessage::PyAnyMessage { content } => content.bind(py).borrow().schema.clone(), @@ -54,6 +57,8 @@ pub(crate) struct Batch { max_batch_size: Option, /// Set when the window is time-bounded; elapsed means flush by time. batch_deadline: Option, + /// Wall time when the first element opened this batch window. + created_at: Instant, elements: Vec, batch_offsets: BTreeMap, } @@ -82,6 +87,7 @@ impl Batch { route, max_batch_size, batch_deadline, + created_at: Instant::now(), elements: vec![first], batch_offsets, } @@ -258,9 +264,14 @@ impl BatchStep { // We create a synthetic watermark to avoid waiting for the next batch to complete before // allowing the consumer to commit. let committable_for_synthetic = b.current_offsets_snapshot(); + let batch_elements = b.len() as f64; + let batch_open_ms = b.created_at.elapsed().as_millis() as f64; let flush_start = Instant::now(); let batch_msg = b.flush()?; get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64()); + let step_labels = vec![("step".to_string(), self.step_name.clone())]; + metrics::histogram!(METRIC_BATCH_SIZE, &step_labels).record(batch_elements); + metrics::histogram!(METRIC_BATCH_TIME_MS, &step_labels).record(batch_open_ms); self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); diff --git a/sentry_streams/src/gcs_writer.rs b/sentry_streams/src/gcs_writer.rs index e6782797..94e00f8f 100644 --- a/sentry_streams/src/gcs_writer.rs +++ b/sentry_streams/src/gcs_writer.rs @@ -21,6 +21,8 @@ use gcp_auth::{provider, TokenProvider}; use std::sync::Arc; use tokio::sync::OnceCell; +const METRIC_SINK_GCS_WRITER_BYTES: &str = "streams.pipeline.sink.gcs_writer.bytes"; + pub struct GCSWriter { client: Client, bucket: String, @@ -102,6 +104,7 @@ impl TaskRunner for GCSWriter { let pybytes_ms = pybytes_start.elapsed().as_millis(); let bytes_len = bytes.len(); + let route_source = self.route.source.clone(); let auth_provider_cell = self.auth_provider.clone(); @@ -176,6 +179,9 @@ impl TaskRunner for GCSWriter { token_ms, request_ms ); + let gcs_labels = vec![("source".to_string(), route_source.clone())]; + metrics::histogram!(METRIC_SINK_GCS_WRITER_BYTES, &gcs_labels) + .record(bytes_len as f64); Ok(message) } })