Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sentry_streams/src/batch_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use sentry_arroyo::utils::timing::Deadline;
use std::collections::{BTreeMap, VecDeque};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

const METRIC_BATCH_SIZE: &str = "streams.pipeline.batch.size";
const METRIC_BATCH_TIME_MS: &str = "streams.pipeline.batch.time_ms";

fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option<String> {
match first {
PyStreamingMessage::PyAnyMessage { content } => content.bind(py).borrow().schema.clone(),
Expand Down Expand Up @@ -54,6 +57,8 @@ pub(crate) struct Batch {
max_batch_size: Option<usize>,
/// Set when the window is time-bounded; elapsed means flush by time.
batch_deadline: Option<Deadline>,
/// Wall time when the first element opened this batch window.
created_at: Instant,
elements: Vec<PyStreamingMessage>,
batch_offsets: BTreeMap<Partition, u64>,
}
Expand Down Expand Up @@ -82,6 +87,7 @@ impl Batch {
route,
max_batch_size,
batch_deadline,
created_at: Instant::now(),
elements: vec![first],
batch_offsets,
}
Expand Down Expand Up @@ -258,9 +264,14 @@ impl BatchStep {
// We create a synthetic watermark to avoid waiting for the next batch to complete before
// allowing the consumer to commit.
let committable_for_synthetic = b.current_offsets_snapshot();
let batch_elements = b.len() as f64;
let batch_open_ms = b.created_at.elapsed().as_millis() as f64;
let flush_start = Instant::now();
let batch_msg = b.flush()?;
get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64());
let step_labels = vec![("step".to_string(), self.step_name.clone())];
metrics::histogram!(METRIC_BATCH_SIZE, &step_labels).record(batch_elements);
metrics::histogram!(METRIC_BATCH_TIME_MS, &step_labels).record(batch_open_ms);
self.batch = None;
let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer);

Expand Down
6 changes: 6 additions & 0 deletions sentry_streams/src/gcs_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use gcp_auth::{provider, TokenProvider};
use std::sync::Arc;
use tokio::sync::OnceCell;

const METRIC_SINK_GCS_WRITER_BYTES: &str = "streams.pipeline.sink.gcs_writer.bytes";

pub struct GCSWriter {
client: Client,
bucket: String,
Expand Down Expand Up @@ -102,6 +104,7 @@ impl TaskRunner<RoutedValue, RoutedValue, anyhow::Error> for GCSWriter {
let pybytes_ms = pybytes_start.elapsed().as_millis();

let bytes_len = bytes.len();
let route_source = self.route.source.clone();

let auth_provider_cell = self.auth_provider.clone();

Expand Down Expand Up @@ -176,6 +179,9 @@ impl TaskRunner<RoutedValue, RoutedValue, anyhow::Error> for GCSWriter {
token_ms,
request_ms
);
let gcs_labels = vec![("source".to_string(), route_source.clone())];
metrics::histogram!(METRIC_SINK_GCS_WRITER_BYTES, &gcs_labels)
.record(bytes_len as f64);
Ok(message)
}
})
Expand Down
Loading