From 59f67d0aac57059a423b280c89643499732b481c Mon Sep 17 00:00:00 2001 From: vianney Date: Mon, 22 Jun 2026 13:03:40 +0200 Subject: [PATCH 1/4] chore(protobuf): add serde to trilean --- libdd-trace-protobuf/build.rs | 2 ++ libdd-trace-protobuf/src/pb.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/libdd-trace-protobuf/build.rs b/libdd-trace-protobuf/build.rs index c9c891a681..d0f7525912 100644 --- a/libdd-trace-protobuf/build.rs +++ b/libdd-trace-protobuf/build.rs @@ -240,6 +240,8 @@ fn generate_protobuf() { "#[serde(rename = \"srv_src\")]", ); + config.type_attribute("Trilean", "#[derive(Deserialize, Serialize)]"); + // idx module type attributes config.type_attribute("pb.idx.AnyValue", "#[derive(Deserialize, Serialize)]"); config.type_attribute( diff --git a/libdd-trace-protobuf/src/pb.rs b/libdd-trace-protobuf/src/pb.rs index feae884fb7..b3085ac0b6 100644 --- a/libdd-trace-protobuf/src/pb.rs +++ b/libdd-trace-protobuf/src/pb.rs @@ -668,6 +668,7 @@ pub struct ClientGroupedStats { >, } /// Trilean is an expanded boolean type that is meant to differentiate between being unset and false. +#[derive(Deserialize, Serialize)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum Trilean { From b0295cdf6dfc87ffcfebb6fbff6e24ff550c0187 Mon Sep 17 00:00:00 2001 From: vianney Date: Mon, 22 Jun 2026 13:52:47 +0200 Subject: [PATCH 2/4] chore(stats): use trilean for is_trace_root --- datadog-ipc/src/shm_stats.rs | 8 +-- .../src/span_concentrator/aggregation.rs | 67 ++++++++++--------- 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/datadog-ipc/src/shm_stats.rs b/datadog-ipc/src/shm_stats.rs index 44941ec667..96497d1169 100644 --- a/datadog-ipc/src/shm_stats.rs +++ b/datadog-ipc/src/shm_stats.rs @@ -805,11 +805,7 @@ impl ShmSpanConcentrator { top_level_hits, span_kind: read_str!(f.span_kind), peer_tags, - is_trace_root: if f.is_trace_root { - pb::Trilean::True.into() - } else { - pb::Trilean::False.into() - }, + is_trace_root: f.is_trace_root.into(), http_method: read_str!(f.http_method), http_endpoint: read_str!(f.http_endpoint), grpc_status_code: f @@ -856,7 +852,7 @@ mod tests { service_source: "", http_status_code: 200, is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, grpc_status_code: None, }, peer_tags: &[], diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index acc7629478..f011eba787 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -13,6 +13,9 @@ use std::borrow::{Borrow, Cow}; use crate::span_concentrator::StatSpan; +/// Sentinel value used for cardinality limiting. +pub const TRACER_BLOCKED_VALUE: &str = "tracer_blocked_value"; + const TAG_STATUS_CODE: &str = "http.status_code"; const TAG_SYNTHETICS: &str = "synthetics"; const TAG_SPANKIND: &str = "span.kind"; @@ -47,7 +50,7 @@ pub struct FixedAggregationKey { pub http_status_code: u32, pub grpc_status_code: Option, pub is_synthetics_request: bool, - pub is_trace_root: bool, + pub is_trace_root: pb::Trilean, } impl FixedAggregationKey { @@ -268,7 +271,11 @@ impl<'a> BorrowedAggregationKey<'a> { is_synthetics_request: span .get_meta(TAG_ORIGIN) .is_some_and(|origin| origin.starts_with(TAG_SYNTHETICS)), - is_trace_root: span.is_trace_root(), + is_trace_root: if span.is_trace_root() { + pb::Trilean::True + } else { + pb::Trilean::False + }, }, peer_tags, } @@ -290,7 +297,8 @@ impl From for OwnedAggregationKey { http_status_code: value.http_status_code, grpc_status_code: value.grpc_status_code.parse().ok(), is_synthetics_request: value.synthetics, - is_trace_root: value.is_trace_root == 1, + is_trace_root: pb::Trilean::try_from(value.is_trace_root) + .unwrap_or(pb::Trilean::NotSet), }, peer_tags: value .peer_tags @@ -419,11 +427,7 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl .into_iter() .map(|(k, v)| format!("{k}:{v}")) .collect(), - is_trace_root: if f.is_trace_root { - pb::Trilean::True.into() - } else { - pb::Trilean::False.into() - }, + is_trace_root: f.is_trace_root.into(), http_method: f.http_method, http_endpoint: f.http_endpoint, grpc_status_code: f @@ -440,6 +444,7 @@ mod tests { use libdd_trace_utils::span::v04::{SpanBytes, SpanSlice}; use super::*; + use libdd_trace_protobuf::pb; use std::hash::Hash; fn get_hash(v: &impl Hash) -> u64 { @@ -481,7 +486,7 @@ mod tests { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -502,7 +507,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "client".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -527,7 +532,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "client".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -554,7 +559,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "producer".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -582,7 +587,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "server".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -603,7 +608,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), is_synthetics_request: true, - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -624,7 +629,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, http_status_code: 418, ..Default::default() } @@ -646,7 +651,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -667,7 +672,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, http_status_code: 418, ..Default::default() } @@ -695,7 +700,7 @@ mod tests { http_method: "GET".into(), http_endpoint: "/api/v1/users".into(), is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -723,7 +728,7 @@ mod tests { http_method: "POST".into(), http_endpoint: "/users/create2".into(), is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -736,7 +741,7 @@ mod tests { }, FixedAggregationKey { grpc_status_code: Some(0), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -749,7 +754,7 @@ mod tests { }, FixedAggregationKey { grpc_status_code: Some(14), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -762,7 +767,7 @@ mod tests { }, FixedAggregationKey { grpc_status_code: Some(14), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -776,7 +781,7 @@ mod tests { }, FixedAggregationKey { grpc_status_code: Some(7), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -789,7 +794,7 @@ mod tests { }, FixedAggregationKey { grpc_status_code: Some(3), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -801,7 +806,7 @@ mod tests { ..Default::default() }, FixedAggregationKey { - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -821,7 +826,7 @@ mod tests { service_name: "my-service".into(), operation_name: "op".into(), resource_name: "res".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, service_source: "redis".into(), ..Default::default() } @@ -842,7 +847,7 @@ mod tests { service_name: "my-service".into(), operation_name: "op".into(), resource_name: "res".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, service_source: "opt.split_by_tag".into(), ..Default::default() } @@ -862,7 +867,7 @@ mod tests { service_name: "my-service".into(), operation_name: "op".into(), resource_name: "res".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, service_source: "".into(), ..Default::default() } @@ -893,7 +898,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "client".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key_with_peers(vec![("aws.s3.bucket".into(), "bucket-a".into())]), @@ -920,7 +925,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "producer".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key_with_peers(vec![ @@ -952,7 +957,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "server".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), From e9d4434477cf5897a664e9019707b20fae17e8a8 Mon Sep 17 00:00:00 2001 From: vianney Date: Mon, 22 Jun 2026 15:06:13 +0200 Subject: [PATCH 3/4] feat(stats): add whole-key cardinality limit --- .../src/trace_exporter/stats.rs | 1 + .../benches/span_concentrator_bench.rs | 1 + .../src/span_concentrator/aggregation.rs | 55 +++- .../src/span_concentrator/mod.rs | 25 +- .../src/span_concentrator/tests.rs | 275 +++++++++++++++++- libdd-trace-stats/src/stats_exporter.rs | 1 + 6 files changed, 350 insertions(+), 8 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 93d668781b..b97cf1c296 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -134,6 +134,7 @@ pub(crate) fn start_stats_computation< std::time::SystemTime::now(), span_kinds, peer_tags, + None, #[cfg(feature = "stats-obfuscation")] Some(client_side_stats.obfuscation_config.clone()), ))); diff --git a/libdd-trace-stats/benches/span_concentrator_bench.rs b/libdd-trace-stats/benches/span_concentrator_bench.rs index 03526acb3e..1e162cb4a2 100644 --- a/libdd-trace-stats/benches/span_concentrator_bench.rs +++ b/libdd-trace-stats/benches/span_concentrator_bench.rs @@ -44,6 +44,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { now, vec![], vec!["db_name".into(), "bucket_s3".into()], + None, #[cfg(feature = "stats-obfuscation")] None, ); diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index f011eba787..4b0008e835 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -282,6 +282,29 @@ impl<'a> BorrowedAggregationKey<'a> { } } +impl OwnedAggregationKey { + /// Return the overflow sentinel key. + pub(super) fn overflow_key() -> Self { + OwnedAggregationKey { + fixed: FixedAggregationKey { + resource_name: TRACER_BLOCKED_VALUE.to_owned(), + service_name: TRACER_BLOCKED_VALUE.to_owned(), + operation_name: TRACER_BLOCKED_VALUE.to_owned(), + span_type: TRACER_BLOCKED_VALUE.to_owned(), + span_kind: TRACER_BLOCKED_VALUE.to_owned(), + http_method: TRACER_BLOCKED_VALUE.to_owned(), + http_endpoint: TRACER_BLOCKED_VALUE.to_owned(), + service_source: TRACER_BLOCKED_VALUE.to_owned(), + http_status_code: 0, + grpc_status_code: None, + is_synthetics_request: false, + is_trace_root: pb::Trilean::NotSet, + }, + peer_tags: vec![], + } + } +} + impl From for OwnedAggregationKey { fn from(value: pb::ClientGroupedStats) -> Self { Self { @@ -358,19 +381,35 @@ impl GroupedStats { pub(super) struct StatsBucket { data: HashMap, start: u64, + /// Maximum number of distinct aggregation keys this bucket will hold before collapsing new + /// ones into the overflow sentinel key. + max_entries: usize, + /// Number of spans collapsed into the overflow bucket due to cardinality limiting. + collapsed_count: u64, } impl StatsBucket { - /// Return a new StatsBucket starting at the given timestamp - pub(super) fn new(start_timestamp: u64) -> Self { + /// Return a new StatsBucket starting at `start_timestamp`. + /// + /// `max_entries` is the maximum number of distinct aggregation keys the bucket will hold. + /// Once the limit is reached, new distinct keys are collapsed into the overflow sentinel key. + pub(super) fn new(start_timestamp: u64, max_entries: usize) -> Self { Self { data: HashMap::new(), start: start_timestamp, + max_entries, + collapsed_count: 0, } } - /// Insert a value as stats in the group corresponding to the aggregation key, if it does - /// not exist it creates it. + /// Return the number of spans collapsed into the overflow bucket. + pub(super) fn collapsed_count(&self) -> u64 { + self.collapsed_count + } + + /// Insert a value as stats in the group corresponding to the aggregation key. If the key is new + /// and the `max_entries` limit has not been reached, a new entry is created, else the span is + /// instead merged into the overflow sentinel key. pub(super) fn insert( &mut self, key: BorrowedAggregationKey<'_>, @@ -378,6 +417,14 @@ impl StatsBucket { is_error: bool, is_top_level: bool, ) { + if self.data.len() >= self.max_entries && !self.data.contains_key(&key) { + self.collapsed_count += 1; + self.data + .entry(OwnedAggregationKey::overflow_key()) + .or_default() + .insert(duration, is_error, is_top_level); + return; + } self.data .entry_ref(&key) .or_default() diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 1e83a4b342..8a9a30d3de 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -65,6 +65,9 @@ pub struct StatsComputationObfuscationConfig { pub type SharedStatsComputationObfuscationConfig = std::sync::Arc>; +/// Default maximum number of distinct aggregation keys per time bucket. +pub const DEFAULT_MAX_ENTRIES_PER_BUCKET: usize = 5_000; + /// SpanConcentrator compute stats on span aggregated by time and span attributes /// /// # Aggregation @@ -80,6 +83,11 @@ pub type SharedStatsComputationObfuscationConfig = /// When the SpanConcentrator is flushed it keeps the `buffer_len` most recent buckets and remove /// all older buckets returning their content. When using force flush all buckets are flushed /// regardless of their age. +/// +/// # Cardinality limiting +/// Each time bucket holds at most `max_entries_per_bucket` distinct aggregation keys. Once that +/// limit is reached, spans whose key is not already present are merged into a single overflow +/// bucket keyed by [`aggregation::TRACER_BLOCKED_VALUE`]. #[derive(Debug, Clone)] pub struct SpanConcentrator { /// Size of the time buckets used for aggregation in nanos @@ -90,6 +98,8 @@ pub struct SpanConcentrator { oldest_timestamp: u64, /// bufferLen is the number stats bucket we keep when flushing. buffer_len: usize, + /// Maximum number of distinct aggregation keys per bucket. + max_entries_per_bucket: usize, /// span.kind fields eligible for stats computation span_kinds_stats_computed: Vec, /// keys for supplementary tags that describe peer.service entities @@ -104,12 +114,15 @@ impl SpanConcentrator { /// - `now` the current system time, used to define the oldest bucket /// - `span_kinds_stats_computed` list of span kinds eligible for stats computation /// - `peer_tags_keys` list of keys considered as peer tags for aggregation + /// - `override_max_entries_per_bucket` maximum distinct aggregation keys per time bucket before + /// cardinality limiting applies. Pass `None` to use [`DEFAULT_MAX_ENTRIES_PER_BUCKET`]. /// - `obfuscation_config` optional and updatable config for resource key obfuscation pub fn new( bucket_size: Duration, now: SystemTime, span_kinds_stats_computed: Vec, peer_tag_keys: Vec, + override_max_entries_per_bucket: Option, #[cfg(feature = "stats-obfuscation")] obfuscation_config: Option< SharedStatsComputationObfuscationConfig, >, @@ -122,6 +135,8 @@ impl SpanConcentrator { bucket_size.as_nanos() as u64, ), buffer_len: 2, + max_entries_per_bucket: override_max_entries_per_bucket + .unwrap_or(DEFAULT_MAX_ENTRIES_PER_BUCKET), span_kinds_stats_computed, peer_tag_keys, #[cfg(feature = "stats-obfuscation")] @@ -178,7 +193,7 @@ impl SpanConcentrator { }; self.buckets .entry(bucket_timestamp) - .or_insert(StatsBucket::new(bucket_timestamp)) + .or_insert_with(|| StatsBucket::new(bucket_timestamp, self.max_entries_per_bucket)) .insert( agg_key, span.duration(), @@ -216,7 +231,8 @@ impl SpanConcentrator { align_timestamp(now_timestamp, self.bucket_size) - (self.buffer_len as u64 - 1) * self.bucket_size }; - buckets + let mut total_collapsed = 0; + let buckets_pb = buckets .into_iter() .filter_map(|(timestamp, bucket)| { // Always keep `bufferLen` buckets (default is 2: current + previous one). @@ -231,9 +247,12 @@ impl SpanConcentrator { self.buckets.insert(timestamp, bucket); return None; } + total_collapsed += bucket.collapsed_count(); Some(bucket.flush(self.bucket_size)) }) - .collect() + .collect(); + //TODO send telemetry + buckets_pb } } diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 56039c4d5b..11f43e1daf 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -1,7 +1,7 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::span_concentrator::aggregation::OwnedAggregationKey; +use crate::span_concentrator::aggregation::{OwnedAggregationKey, TRACER_BLOCKED_VALUE}; use super::*; use libdd_trace_utils::span::v04::VecMap; @@ -105,6 +105,7 @@ fn test_concentrator_oldest_timestamp_cold() { now, vec![], vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -161,6 +162,7 @@ fn test_concentrator_oldest_timestamp_hot() { now, vec![], vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -240,6 +242,7 @@ fn test_concentrator_stats_totals() { now, vec![], vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -306,6 +309,7 @@ fn test_concentrator_stats_counts() { now, vec![], vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -603,6 +607,7 @@ fn test_span_should_be_included_in_stats() { now, get_span_kinds(), vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -683,6 +688,7 @@ fn test_ignore_partial_spans() { now, get_span_kinds(), vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -708,6 +714,7 @@ fn test_force_flush() { now, get_span_kinds(), vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -791,6 +798,7 @@ fn test_peer_tags_aggregation() { now, get_span_kinds(), vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -799,6 +807,7 @@ fn test_peer_tags_aggregation() { now, get_span_kinds(), vec!["db.instance".to_string(), "db.system".to_string()], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -982,6 +991,7 @@ fn test_peer_tags_quantization_aggregation() { "db.system".to_string(), "peer.hostname".to_string(), ], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -1108,6 +1118,7 @@ fn test_base_service_peer_tag() { now, get_span_kinds(), vec!["db.instance".to_string(), "db.system".to_string()], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -1330,6 +1341,7 @@ fn test_pb_span() { now, get_span_kinds(), vec!["db.instance".to_string(), "db.system".to_string()], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -1569,3 +1581,264 @@ fn test_pb_span() { assert_counts_equal(expected_stats, bucket.stats.clone()); } + +/// Build a minimal concentrator with a tiny `max_entries_per_bucket` for cardinality tests. +fn make_cardinality_concentrator(max_entries: usize) -> SpanConcentrator { + let now = SystemTime::now(); + SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + Some(max_entries), + #[cfg(feature = "stats-obfuscation")] + None, + ) +} + +/// When the limit is 3 and we insert 5 distinct-resource spans, only 3 normal keys plus one +/// overflow key must appear in the flushed stats. Total hits must equal 5. +#[test] +fn test_cardinality_limit_collapse() { + let now = SystemTime::now(); + let limit: usize = 3; + let mut concentrator = make_cardinality_concentrator(limit); + + // Insert limit + 2 distinct-resource root spans all in the same time bucket. + let resources: Vec = (0..limit + 2).map(|i| format!("resource-{i}")).collect(); + for (i, resource) in resources.iter().enumerate() { + let span = get_test_span_with_meta( + now, + i as u64 + 1, + 0, + 100, + 2, + "svc", + resource, + 0, + &[], + &[("_dd.measured", 1.0)], + ); + concentrator.add_span(&span); + } + + let buckets = concentrator.flush(SystemTime::now(), true); + assert!(!buckets.is_empty(), "should get at least one time bucket"); + + let stats = &buckets[0].stats; + + // Exactly limit normal keys + 1 overflow key. + assert_eq!( + stats.len(), + limit + 1, + "expected {limit} normal groups + 1 overflow group, got {}", + stats.len() + ); + + // Total hits must be preserved. + let total_hits: u64 = stats.iter().map(|g| g.hits).sum(); + assert_eq!( + total_hits, + (limit + 2) as u64, + "total hits must equal the number of inserted spans" + ); + + // Exactly one overflow group, identified by the sentinel resource. + let overflow_groups: Vec<_> = stats + .iter() + .filter(|g| g.resource == TRACER_BLOCKED_VALUE) + .collect(); + assert_eq!( + overflow_groups.len(), + 1, + "expected exactly one overflow group" + ); +} + +/// The overflow bucket must correctly aggregate the hits from overflow spans. +#[test] +fn test_overflow_bucket_counts() { + let now = SystemTime::now(); + let limit: usize = 1; + let mut concentrator = make_cardinality_concentrator(limit); + + // First span fills the sole slot; the next 4 spans all have distinct keys → all overflow. + for i in 0..5usize { + let resource = format!("resource-{i}"); + let span = get_test_span_with_meta( + now, + i as u64 + 1, + 0, + 10 * (i as i64 + 1), + 2, + "svc", + &resource, + 0, + &[], + &[("_dd.measured", 1.0)], + ); + concentrator.add_span(&span); + } + + let buckets = concentrator.flush(SystemTime::now(), true); + assert!(!buckets.is_empty()); + let stats = &buckets[0].stats; + + // There must be exactly 2 groups: 1 normal + 1 overflow. + assert_eq!( + stats.len(), + 2, + "expected exactly 1 normal + 1 overflow group" + ); + + let overflow = stats + .iter() + .find(|g| g.resource == TRACER_BLOCKED_VALUE) + .expect("overflow group must exist"); + + // 4 spans overflowed, total duration = 20 + 30 + 40 + 50 = 140. + assert_eq!(overflow.hits, 4, "all 4 overflow spans must be merged"); + assert_eq!( + overflow.duration, 140, + "overflow durations must sum correctly" + ); +} + +/// When the number of distinct spans is within the limit, no overflow bucket should appear. +#[test] +fn test_no_collapse_within_limit() { + let now = SystemTime::now(); + let limit: usize = 10; + let mut concentrator = make_cardinality_concentrator(limit); + + // Insert exactly `limit` distinct-resource spans — no overflow expected. + for i in 0..limit { + let resource = format!("resource-{i}"); + let span = get_test_span_with_meta( + now, + i as u64 + 1, + 0, + 50, + 2, + "svc", + &resource, + 0, + &[], + &[("_dd.measured", 1.0)], + ); + concentrator.add_span(&span); + } + + let buckets = concentrator.flush(SystemTime::now(), true); + assert!(!buckets.is_empty()); + let stats = &buckets[0].stats; + + assert_eq!( + stats.len(), + limit, + "expected exactly {limit} groups with no overflow" + ); + assert!( + stats.iter().all(|g| g.resource != TRACER_BLOCKED_VALUE), + "no overflow group should be present within the limit" + ); +} + +/// The overflow `ClientGroupedStats` row must carry `tracer_blocked_value` on all sentinel +/// string fields as specified by the RFC. +#[test] +fn test_overflow_bucket_key_sentinel_values() { + let now = SystemTime::now(); + let limit: usize = 1; + let mut concentrator = make_cardinality_concentrator(limit); + + // First span occupies the only slot; second one overflows. + let first = get_test_span_with_meta( + now, + 1, + 0, + 50, + 2, + "my-service", + "my-resource", + 0, + &[], + &[("_dd.measured", 1.0)], + ); + let second = get_test_span_with_meta( + now, + 2, + 0, + 75, + 2, + "other-service", + "other-resource", + 0, + &[], + &[("_dd.measured", 1.0)], + ); + + concentrator.add_span(&first); + concentrator.add_span(&second); + + let buckets = concentrator.flush(SystemTime::now(), true); + assert!(!buckets.is_empty()); + let stats = &buckets[0].stats; + + let overflow = stats + .iter() + .find(|g| g.resource == TRACER_BLOCKED_VALUE) + .expect("overflow group must exist"); + + // Every string dimension must be the sentinel. + assert_eq!( + overflow.service, TRACER_BLOCKED_VALUE, + "service must be sentinel" + ); + assert_eq!(overflow.name, TRACER_BLOCKED_VALUE, "name must be sentinel"); + assert_eq!( + overflow.resource, TRACER_BLOCKED_VALUE, + "resource must be sentinel" + ); + assert_eq!( + overflow.r#type, TRACER_BLOCKED_VALUE, + "type must be sentinel" + ); + assert_eq!( + overflow.span_kind, TRACER_BLOCKED_VALUE, + "span_kind must be sentinel" + ); + assert_eq!( + overflow.http_method, TRACER_BLOCKED_VALUE, + "http_method must be sentinel" + ); + assert_eq!( + overflow.http_endpoint, TRACER_BLOCKED_VALUE, + "http_endpoint must be sentinel" + ); + assert_eq!( + overflow.service_source, TRACER_BLOCKED_VALUE, + "service_source must be sentinel" + ); + // Numeric and boolean fields must be zero/false (NOT_SET per RFC). + assert_eq!(overflow.http_status_code, 0, "http_status_code must be 0"); + assert_eq!( + overflow.grpc_status_code, "", + "grpc_status_code must be empty" + ); + assert!(!overflow.synthetics, "synthetics must be false"); + // is_trace_root uses Trilean; NOT_SET maps to 0. + assert_eq!( + overflow.is_trace_root, 0, + "is_trace_root must be NOT_SET (0)" + ); + assert!(overflow.peer_tags.is_empty(), "peer_tags must be empty"); + + // The normal group must be unaffected. + let normal = stats + .iter() + .find(|g| g.resource != TRACER_BLOCKED_VALUE) + .expect("normal group must exist"); + assert_eq!(normal.service, "my-service"); + assert_eq!(normal.resource, "my-resource"); +} diff --git a/libdd-trace-stats/src/stats_exporter.rs b/libdd-trace-stats/src/stats_exporter.rs index c91a6de4c9..75def29cf7 100644 --- a/libdd-trace-stats/src/stats_exporter.rs +++ b/libdd-trace-stats/src/stats_exporter.rs @@ -309,6 +309,7 @@ mod tests { SystemTime::now() - BUCKETS_DURATION * 3, vec![], vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); From 55253bcb9cd4fa2863f426cf0becb374c83e70e8 Mon Sep 17 00:00:00 2001 From: vianney Date: Wed, 24 Jun 2026 15:59:35 +0200 Subject: [PATCH 4/4] feat(stats): send telemetry for cardinality limits --- Cargo.lock | 2 + datadog-ipc/src/shm_stats.rs | 4 +- datadog-sidecar/Cargo.toml | 2 +- datadog-sidecar/src/service/stats_flusher.rs | 2 + libdd-data-pipeline/Cargo.toml | 4 +- .../src/trace_exporter/stats.rs | 3 + libdd-trace-stats/Cargo.toml | 4 + .../src/span_concentrator/mod.rs | 15 +- .../src/span_concentrator/tests.rs | 42 +-- libdd-trace-stats/src/stats_exporter.rs | 301 +++++++++++++++++- 10 files changed, 340 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b328536d5..da09a95e70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3433,7 +3433,9 @@ dependencies = [ "libdd-capabilities-impl", "libdd-common", "libdd-ddsketch", + "libdd-dogstatsd-client", "libdd-shared-runtime", + "libdd-telemetry", "libdd-trace-obfuscation", "libdd-trace-protobuf", "libdd-trace-utils", diff --git a/datadog-ipc/src/shm_stats.rs b/datadog-ipc/src/shm_stats.rs index 96497d1169..afdadfd8ec 100644 --- a/datadog-ipc/src/shm_stats.rs +++ b/datadog-ipc/src/shm_stats.rs @@ -819,8 +819,8 @@ impl ShmSpanConcentrator { } impl FlushableConcentrator for ShmSpanConcentrator { - fn flush_buckets(&mut self, force: bool) -> Vec { - self.drain_buckets(force) + fn flush_buckets(&mut self, force: bool) -> (Vec, u64) { + (self.drain_buckets(force), 0) } } diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index 443b292b32..b3f8e4b28b 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -26,7 +26,7 @@ datadog-sidecar-macros = { path = "../datadog-sidecar-macros" } libdd-telemetry = { path = "../libdd-telemetry", features = ["tracing"] } libdd-data-pipeline = { path = "../libdd-data-pipeline" } libdd-trace-utils = { path = "../libdd-trace-utils" } -libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] } +libdd-trace-stats = { path = "../libdd-trace-stats", features = ["telemetry", "dogstatsd"] } libdd-remote-config = { path = "../libdd-remote-config" } datadog-live-debugger = { path = "../datadog-live-debugger" } datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics"] } diff --git a/datadog-sidecar/src/service/stats_flusher.rs b/datadog-sidecar/src/service/stats_flusher.rs index 86bfd9f395..f1cdc169da 100644 --- a/datadog-sidecar/src/service/stats_flusher.rs +++ b/datadog-sidecar/src/service/stats_flusher.rs @@ -118,6 +118,8 @@ fn make_exporter( )), #[cfg(feature = "stats-obfuscation")] "0", + None, + None, ) } diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index 00578c4eeb..10ca5b5a44 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -37,7 +37,7 @@ libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime", de libdd-telemetry = { version = "5.0.1", path = "../libdd-telemetry", default-features = false, optional = true} libdd-trace-protobuf = { version = "3.0.2", path = "../libdd-trace-protobuf" } libdd-trace-normalization = { version = "2.0.0", path = "../libdd-trace-normalization" } -libdd-trace-stats = { version = "5.0.0", path = "../libdd-trace-stats", default-features = false } +libdd-trace-stats = { version = "5.0.0", path = "../libdd-trace-stats", default-features = false, features = ["dogstatsd"] } libdd-trace-utils = { version = "8.0.0", path = "../libdd-trace-utils", default-features = false } libdd-trace-obfuscation = { version = "4.0.0", path = "../libdd-trace-obfuscation", default-features = false, optional = true } libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" } @@ -85,7 +85,7 @@ duplicate = "2.0.1" [features] default = ["https", "telemetry"] -telemetry = ["libdd-telemetry"] +telemetry = ["libdd-telemetry", "libdd-trace-stats/telemetry"] https = [ "libdd-common/https", "libdd-capabilities-impl/https", diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index b97cf1c296..e5bb28cd3f 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -164,6 +164,9 @@ fn create_and_start_stats_worker< client_side_stats.obfuscation_config.clone(), #[cfg(feature = "stats-obfuscation")] SUPPORTED_OBFUSCATION_VERSION_STR, + #[cfg(feature = "telemetry")] + None, + None, ); let worker_handle = ctx .shared_runtime diff --git a/libdd-trace-stats/Cargo.toml b/libdd-trace-stats/Cargo.toml index c81cc08c57..284e5d7f20 100644 --- a/libdd-trace-stats/Cargo.toml +++ b/libdd-trace-stats/Cargo.toml @@ -15,7 +15,9 @@ anyhow = "1.0" libdd-capabilities = { path = "../libdd-capabilities", version = "2.0.0" } libdd-common = { version = "4.2.0", path = "../libdd-common", default-features = false } libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" } +libdd-dogstatsd-client = { version = "3.0.0", path = "../libdd-dogstatsd-client", optional = true } libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime", default-features = false } +libdd-telemetry = { version = "5.0.1", path = "../libdd-telemetry", optional = true } libdd-trace-protobuf = { version = "3.0.2", path = "../libdd-trace-protobuf" } libdd-trace-obfuscation = { version = "4.0.0", path = "../libdd-trace-obfuscation", default-features = false } libdd-trace-utils = { version = "8.0.0", path = "../libdd-trace-utils", default-features = false } @@ -49,5 +51,7 @@ tokio = { version = "1.23", features = ["rt-multi-thread", "macros", "test-util" [features] default = ["https"] stats-obfuscation = [] +telemetry = ["libdd-telemetry"] +dogstatsd = ["libdd-dogstatsd-client"] https = ["libdd-common/https", "libdd-capabilities-impl/https", "libdd-shared-runtime/https"] fips = ["libdd-common/fips", "libdd-capabilities-impl/fips", "libdd-shared-runtime/fips"] diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 8a9a30d3de..50b93dd1a3 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -20,11 +20,13 @@ pub use stat_span::StatSpan; /// `StatsExporter` is generic over `C: FlushableConcentrator` so it can work with /// both the in-process [`SpanConcentrator`] and the SHM-backed `ShmSpanConcentrator`. pub trait FlushableConcentrator { - fn flush_buckets(&mut self, force: bool) -> Vec; + /// Flush time buckets and return them together with the number of spans that were + /// collapsed into the overflow sentinel bucket due to cardinality limiting. + fn flush_buckets(&mut self, force: bool) -> (Vec, u64); } impl FlushableConcentrator for SpanConcentrator { - fn flush_buckets(&mut self, force: bool) -> Vec { + fn flush_buckets(&mut self, force: bool) -> (Vec, u64) { self.flush(SystemTime::now(), force) } } @@ -221,7 +223,11 @@ impl SpanConcentrator { /// Flush all stats bucket except for the `buffer_len` most recent. If `force` is true, flush /// all buckets. - pub fn flush(&mut self, now: SystemTime, force: bool) -> Vec { + /// + /// Returns a tuple of `(buckets, collapsed_spans)` where `collapsed_spans` is the total number + /// of spans that were collapsed into the overflow sentinel bucket due to cardinality limiting + /// across all flushed time buckets. + pub fn flush(&mut self, now: SystemTime, force: bool) -> (Vec, u64) { // TODO: Wait for HashMap::extract_if to be stabilized to avoid a full drain let now_timestamp = system_time_to_unix_duration(now).as_nanos() as u64; let buckets: Vec<(u64, StatsBucket)> = self.buckets.drain().collect(); @@ -251,8 +257,7 @@ impl SpanConcentrator { Some(bucket.flush(self.bucket_size)) }) .collect(); - //TODO send telemetry - buckets_pb + (buckets_pb, total_collapsed) } } diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 11f43e1daf..e4a1167454 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -126,12 +126,12 @@ fn test_concentrator_oldest_timestamp_cold() { // Assert we didn't insert spans in older buckets for _ in 0..concentrator.buffer_len { - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(stats.len(), 0, "We should get 0 time buckets"); flushtime += Duration::from_nanos(concentrator.bucket_size); } - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(stats.len(), 1, "We should get exactly one time bucket"); @@ -188,12 +188,12 @@ fn test_concentrator_oldest_timestamp_hot() { } for _ in 0..(concentrator.buffer_len - 1) { - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert!(stats.is_empty(), "We should get 0 time buckets"); flushtime += Duration::from_nanos(concentrator.bucket_size); } - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(stats.len(), 1, "We should get exactly one time bucket"); // First oldest bucket aggregates, it should have it all except the @@ -213,7 +213,7 @@ fn test_concentrator_oldest_timestamp_hot() { assert_counts_equal(expected, stats.first().unwrap().stats.clone()); flushtime += Duration::from_nanos(concentrator.bucket_size); - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(stats.len(), 1, "We should get exactly one time bucket"); // Stats of the last four spans. @@ -278,7 +278,7 @@ fn test_concentrator_stats_totals() { let mut flushtime = now; for _ in 0..=concentrator.buffer_len { - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); if stats.is_empty() { continue; } @@ -528,7 +528,7 @@ fn test_concentrator_stats_counts() { let mut flushtime = now; for _ in 0..=concentrator.buffer_len + 2 { - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); let expected_flushed_timestamps = align_timestamp( system_time_to_unix_duration(flushtime).as_nanos() as u64, concentrator.bucket_size, @@ -553,7 +553,7 @@ fn test_concentrator_stats_counts() { stats.first().unwrap().stats.clone(), ); - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!( stats.len(), 0, @@ -658,7 +658,7 @@ fn test_span_should_be_included_in_stats() { }, ]; - let stats = concentrator.flush( + let (stats, _) = concentrator.flush( now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64), false, ); @@ -696,7 +696,7 @@ fn test_ignore_partial_spans() { concentrator.add_span(span); } - let stats = concentrator.flush( + let (stats, _) = concentrator.flush( now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64), false, ); @@ -727,10 +727,10 @@ fn test_force_flush() { let flushtime = now - Duration::from_secs(3600); // Bucket should not be flushed without force flush - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(0, stats.len()); - let stats = concentrator.flush(flushtime, true); + let (stats, _) = concentrator.flush(flushtime, true); assert_eq!(1, stats.len()); } @@ -900,7 +900,7 @@ fn test_peer_tags_aggregation() { }, ]; - let stats_with_peer_tags = concentrator_with_peer_tags.flush(flushtime, false); + let (stats_with_peer_tags, _) = concentrator_with_peer_tags.flush(flushtime, false); assert_counts_equal( expected_with_peer_tags, stats_with_peer_tags @@ -910,7 +910,7 @@ fn test_peer_tags_aggregation() { .clone(), ); - let stats_without_peer_tags = concentrator_without_peer_tags.flush(flushtime, false); + let (stats_without_peer_tags, _) = concentrator_without_peer_tags.flush(flushtime, false); assert_counts_equal( expected_without_peer_tags, stats_without_peer_tags @@ -1023,7 +1023,7 @@ fn test_peer_tags_quantization_aggregation() { ..Default::default() }]; - let stats_with_peer_tags = concentrator_with_peer_tags.flush(flushtime, false); + let (stats_with_peer_tags, _) = concentrator_with_peer_tags.flush(flushtime, false); assert_counts_equal( expected_with_peer_tags, stats_with_peer_tags @@ -1193,7 +1193,7 @@ fn test_base_service_peer_tag() { }, ]; - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_counts_equal( expected, stats @@ -1497,7 +1497,7 @@ fn test_pb_span() { // Flush and get stats let flushtime = now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64); - let stats = concentrator.flush(flushtime, false); + let (stats, _) = concentrator.flush(flushtime, false); assert_eq!(stats.len(), 1, "Should get exactly one time bucket"); let bucket = &stats[0]; @@ -1622,7 +1622,7 @@ fn test_cardinality_limit_collapse() { concentrator.add_span(&span); } - let buckets = concentrator.flush(SystemTime::now(), true); + let (buckets, _) = concentrator.flush(SystemTime::now(), true); assert!(!buckets.is_empty(), "should get at least one time bucket"); let stats = &buckets[0].stats; @@ -1680,7 +1680,7 @@ fn test_overflow_bucket_counts() { concentrator.add_span(&span); } - let buckets = concentrator.flush(SystemTime::now(), true); + let (buckets, _) = concentrator.flush(SystemTime::now(), true); assert!(!buckets.is_empty()); let stats = &buckets[0].stats; @@ -1729,7 +1729,7 @@ fn test_no_collapse_within_limit() { concentrator.add_span(&span); } - let buckets = concentrator.flush(SystemTime::now(), true); + let (buckets, _) = concentrator.flush(SystemTime::now(), true); assert!(!buckets.is_empty()); let stats = &buckets[0].stats; @@ -1781,7 +1781,7 @@ fn test_overflow_bucket_key_sentinel_values() { concentrator.add_span(&first); concentrator.add_span(&second); - let buckets = concentrator.flush(SystemTime::now(), true); + let (buckets, _) = concentrator.flush(SystemTime::now(), true); assert!(!buckets.is_empty()); let stats = &buckets[0].stats; diff --git a/libdd-trace-stats/src/stats_exporter.rs b/libdd-trace-stats/src/stats_exporter.rs index 75def29cf7..6328be02ac 100644 --- a/libdd-trace-stats/src/stats_exporter.rs +++ b/libdd-trace-stats/src/stats_exporter.rs @@ -25,6 +25,11 @@ use tracing::error; pub const STATS_ENDPOINT_PATH: &str = "/v0.6/stats"; +/// Metric name for the number of spans collapsed into the overflow sentinel bucket due to +/// per-bucket cardinality limiting. Emitted on both the DogStatsD health-metrics channel +/// and the instrumentation-telemetry channel. +pub const COLLAPSED_SPANS_METRIC: &str = "datadog.tracer.stats.collapsed_spans"; + /// Metadata needed by the stats exporter to annotate payloads and HTTP requests. #[derive(Clone, Default, Debug)] pub struct StatsMetadata { @@ -93,6 +98,16 @@ pub struct StatsExporter< obfuscation_config: SharedStatsComputationObfuscationConfig, #[cfg(feature = "stats-obfuscation")] supported_obfuscation_version: &'static str, + /// Optional telemetry handle and its pre-registered context key, used to emit + /// `COLLAPSED_SPANS_METRIC`. + #[cfg(feature = "telemetry")] + telemetry: Option<( + libdd_telemetry::worker::TelemetryWorkerHandle, + libdd_telemetry::metrics::ContextKey, + )>, + /// Optional DogStatsD client used to emit `COLLAPSED_SPANS_METRIC`. + #[cfg(feature = "dogstatsd")] + dogstatsd: Option, } impl @@ -105,6 +120,7 @@ impl /// agent /// - `meta` metadata used in ClientStatsPayload and as headers to send stats to the agent /// - `endpoint` the Endpoint used to send stats to the agent + #[allow(clippy::too_many_arguments)] pub fn new( flush_interval: time::Duration, concentrator: Arc>, @@ -114,7 +130,22 @@ impl #[cfg(feature = "stats-obfuscation")] obfuscation_config: SharedStatsComputationObfuscationConfig, #[cfg(feature = "stats-obfuscation")] supported_obfuscation_version: &'static str, + #[cfg(feature = "telemetry")] telemetry: Option< + libdd_telemetry::worker::TelemetryWorkerHandle, + >, + #[cfg(feature = "dogstatsd")] dogstatsd: Option, ) -> Self { + #[cfg(feature = "telemetry")] + let telemetry = telemetry.map(|handle| { + let key = handle.register_metric_context( + COLLAPSED_SPANS_METRIC.to_string(), + vec![], + libdd_telemetry::data::metrics::MetricType::Count, + true, + libdd_telemetry::data::metrics::MetricNamespace::Tracers, + ); + (handle, key) + }); Self { flush_interval, concentrator, @@ -126,6 +157,10 @@ impl obfuscation_config, #[cfg(feature = "stats-obfuscation")] supported_obfuscation_version, + #[cfg(feature = "telemetry")] + telemetry, + #[cfg(feature = "dogstatsd")] + dogstatsd, } } @@ -146,7 +181,23 @@ impl /// case stats cannot be flushed since the concentrator might be corrupted. /// Returns `Ok(true)` if stats were sent, `Ok(false)` if the concentrator had nothing to send. pub async fn send(&self, force_flush: bool) -> anyhow::Result { - let payload = self.flush(force_flush); + let (payload, collapsed_spans) = self.flush(force_flush); + + if collapsed_spans > 0 { + #[cfg(feature = "telemetry")] + if let Some((handle, key)) = &self.telemetry { + let _ = handle.add_point(collapsed_spans as f64, key, vec![]); + } + #[cfg(feature = "dogstatsd")] + if let Some(client) = &self.dogstatsd { + client.send(vec![libdd_dogstatsd_client::DogStatsDAction::Count( + COLLAPSED_SPANS_METRIC, + collapsed_spans as i64, + [].iter(), + )]); + } + } + if payload.stats.is_empty() { return Ok(false); } @@ -194,14 +245,13 @@ impl /// # Panic /// Will panic if another thread panicked while holding the concentrator lock in which /// case stats cannot be flushed since the concentrator might be corrupted. - fn flush(&self, force_flush: bool) -> pb::ClientStatsPayload { + fn flush(&self, force_flush: bool) -> (pb::ClientStatsPayload, u64) { let sequence = self.sequence_id.fetch_add(1, Ordering::Relaxed); - encode_stats_payload( - &self.meta, - sequence, - #[allow(clippy::unwrap_used)] - self.concentrator.lock().unwrap().flush_buckets(force_flush), - ) + #[allow(clippy::unwrap_used)] + let (buckets, collapsed_spans) = + self.concentrator.lock().unwrap().flush_buckets(force_flush); + let payload = encode_stats_payload(&self.meta, sequence, buckets); + (payload, collapsed_spans) } } @@ -357,6 +407,10 @@ mod tests { StatsComputationObfuscationConfig::disabled(), #[cfg(feature = "stats-obfuscation")] "1", + #[cfg(feature = "telemetry")] + None, + #[cfg(feature = "dogstatsd")] + None, ); let send_status = stats_exporter.send(true).await; @@ -388,6 +442,10 @@ mod tests { StatsComputationObfuscationConfig::disabled(), #[cfg(feature = "stats-obfuscation")] "1", + #[cfg(feature = "telemetry")] + None, + #[cfg(feature = "dogstatsd")] + None, ); let send_status = stats_exporter.send(true).await; @@ -427,6 +485,10 @@ mod tests { StatsComputationObfuscationConfig::disabled(), #[cfg(feature = "stats-obfuscation")] "1", + #[cfg(feature = "telemetry")] + None, + #[cfg(feature = "dogstatsd")] + None, ); let _handle = shared_runtime .spawn_worker(stats_exporter, true) @@ -472,6 +534,10 @@ mod tests { StatsComputationObfuscationConfig::disabled(), #[cfg(feature = "stats-obfuscation")] "1", + #[cfg(feature = "telemetry")] + None, + #[cfg(feature = "dogstatsd")] + None, ); let _handle = shared_runtime @@ -543,6 +609,10 @@ mod tests { })), #[cfg(feature = "stats-obfuscation")] "1", + #[cfg(feature = "telemetry")] + None, + #[cfg(feature = "dogstatsd")] + None, ); let send_status = stats_exporter.send(true).await; @@ -550,4 +620,219 @@ mod tests { mock.assert_async().await; } + + /// Build a concentrator with `max_entries_per_bucket = 1` pre-seeded with four distinct spans + /// so that three spans is collapsed into the overflow bucket. + fn get_collapsed_concentrator() -> SpanConcentrator { + use libdd_trace_utils::span::{trace_utils, v04::SpanSlice}; + + let mut concentrator = SpanConcentrator::new( + BUCKETS_DURATION, + SystemTime::now(), + vec![], + vec![], + Some(1), // max 1 distinct key → second span collapses + #[cfg(feature = "stats-obfuscation")] + None, + ); + + let mut trace = vec![ + SpanSlice { + service: "svc", + resource: "resource-a", + duration: 10, + ..Default::default() + }, + SpanSlice { + service: "svc", + resource: "resource-b", + duration: 20, + ..Default::default() + }, + SpanSlice { + service: "svc", + resource: "resource-c", + duration: 20, + ..Default::default() + }, + SpanSlice { + service: "svc", + resource: "resource-d", + duration: 20, + ..Default::default() + }, + ]; + trace_utils::compute_top_level_span(trace.as_mut_slice()); + for span in &trace { + concentrator.add_span(span); + } + concentrator + } + + /// Verify that when `collapsed_spans == 0` the DogStatsD socket receives nothing. + #[cfg(feature = "dogstatsd")] + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_no_emission_when_zero() { + use std::net; + + let server = MockServer::start_async().await; + server + .mock_async(|_when, then| { + then.status(200).body(""); + }) + .await; + + // Bind a UDP socket so we can detect whether anything arrives. + let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind UDP socket"); + socket + .set_read_timeout(Some(std::time::Duration::from_millis(200))) + .unwrap(); + let addr = socket.local_addr().unwrap().to_string(); + + let dogstatsd_client = + libdd_dogstatsd_client::new(libdd_common::Endpoint::from_slice(&addr)) + .expect("failed to create dogstatsd client"); + + // get_test_concentrator() has no cardinality collapse: collapsed_spans will be 0. + let stats_exporter = StatsExporter::::new( + BUCKETS_DURATION, + Arc::new(Mutex::new(get_test_concentrator())), + get_test_metadata(), + Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), + NativeCapabilities::new_client(), + #[cfg(feature = "stats-obfuscation")] + StatsComputationObfuscationConfig::disabled(), + #[cfg(feature = "stats-obfuscation")] + "1", + #[cfg(feature = "telemetry")] + None, + Some(dogstatsd_client), + ); + + stats_exporter.send(true).await.unwrap(); + + // The socket must not have received any datagram. + let mut buf = [0u8; 256]; + let result = socket.recv(&mut buf); + assert!( + result.is_err(), + "No DogStatsD datagram expected when collapsed_spans == 0" + ); + } + + /// Verify that `COLLAPSED_SPANS_METRIC` is emitted to DogStatsD when spans are collapsed. + #[cfg(feature = "dogstatsd")] + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_collapsed_spans_dogstatsd() { + use std::net; + + let server = MockServer::start_async().await; + server + .mock_async(|_when, then| { + then.status(200).body(""); + }) + .await; + + let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind UDP socket"); + socket + .set_read_timeout(Some(std::time::Duration::from_millis(500))) + .unwrap(); + let addr = socket.local_addr().unwrap().to_string(); + + let dogstatsd_client = + libdd_dogstatsd_client::new(libdd_common::Endpoint::from_slice(&addr)) + .expect("failed to create dogstatsd client"); + + let stats_exporter = StatsExporter::::new( + BUCKETS_DURATION, + Arc::new(Mutex::new(get_collapsed_concentrator())), + get_test_metadata(), + Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), + NativeCapabilities::new_client(), + #[cfg(feature = "stats-obfuscation")] + StatsComputationObfuscationConfig::disabled(), + #[cfg(feature = "stats-obfuscation")] + "1", + #[cfg(feature = "telemetry")] + None, + Some(dogstatsd_client), + ); + + stats_exporter.send(true).await.unwrap(); + + let mut buf = [0u8; 256]; + let n = socket + .recv(&mut buf) + .expect("expected a DogStatsD datagram"); + let datagram = std::str::from_utf8(&buf[..n]).expect("valid utf-8"); + assert_eq!( + datagram, "datadog.tracer.stats.collapsed_spans:3|c", + "DogStatsD datagram must match the expected format" + ); + } + + /// Verify that `COLLAPSED_SPANS_METRIC` is enqueued to the telemetry worker when spans + /// are collapsed. This does not verify the actual value of the metric. + #[cfg(feature = "telemetry")] + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_collapsed_spans_telemetry() { + use libdd_telemetry::worker::TelemetryWorkerBuilder; + + let server = MockServer::start_async().await; + server + .mock_async(|_when, then| { + then.status(200).body(""); + }) + .await; + + let (handle, _join_handle) = TelemetryWorkerBuilder::new( + "test-host".to_string(), + "test-service".to_string(), + "rust".to_string(), + "1.0".to_string(), + "0.0.0".to_string(), + ) + .spawn(); + + let stats_exporter = StatsExporter::::new( + BUCKETS_DURATION, + Arc::new(Mutex::new(get_collapsed_concentrator())), + get_test_metadata(), + Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), + NativeCapabilities::new_client(), + #[cfg(feature = "stats-obfuscation")] + StatsComputationObfuscationConfig::disabled(), + #[cfg(feature = "stats-obfuscation")] + "1", + #[cfg(feature = "telemetry")] + Some(handle), + #[cfg(feature = "dogstatsd")] + None, + ); + + stats_exporter.send(true).await.unwrap(); + + let stats_exporter_ref = &stats_exporter; + let (handle_ref, _key) = stats_exporter_ref + .telemetry + .as_ref() + .expect("telemetry must be set"); + let receiver = handle_ref.stats().expect("failed to request stats"); + let stats = receiver.await.expect("failed to receive stats"); + // metric_contexts == 1 verifies that exactly one metric name was registered + // (i.e. COLLAPSED_SPANS_METRIC and nothing else). + // metric_buckets.buckets == 1 verifies that a data point was recorded for it. + // However it does not check the value of the data point. + assert_eq!( + stats.metric_contexts, 1, + "exactly one metric context (COLLAPSED_SPANS_METRIC) should be registered" + ); + assert_eq!( + stats.metric_buckets.buckets, 1, + "exactly one metric bucket expected after one collapsed-spans emission" + ); + } }