diff --git a/Cargo.lock b/Cargo.lock index 27fa4393f2..657592d5b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3063,9 +3063,11 @@ name = "libdd-dogstatsd-client" version = "3.0.0" dependencies = [ "anyhow", + "async-trait", "cadence", "http", "libdd-common", + "libdd-shared-runtime", "serde", "tokio", "tracing", diff --git a/libdd-dogstatsd-client/Cargo.toml b/libdd-dogstatsd-client/Cargo.toml index 4c644be494..bcaa5027da 100644 --- a/libdd-dogstatsd-client/Cargo.toml +++ b/libdd-dogstatsd-client/Cargo.toml @@ -18,11 +18,15 @@ serde = { version = "1.0", features = ["derive", "rc"] } tracing = { version = "0.1", default-features = false } anyhow = { version = "1.0" } http = "1.1" +libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime", optional = true } +tokio = { version = "1.23", features = ["sync"], optional = true } +async-trait = { version = "0.1", optional = true } [features] -default = ["https"] +default = ["https", "shared-runtime"] https = ["libdd-common/https"] fips = ["libdd-common/fips"] +shared-runtime = ["dep:libdd-shared-runtime", "dep:tokio", "dep:async-trait"] [dev-dependencies] diff --git a/libdd-dogstatsd-client/src/action.rs b/libdd-dogstatsd-client/src/action.rs new file mode 100644 index 0000000000..b2482f8087 --- /dev/null +++ b/libdd-dogstatsd-client/src/action.rs @@ -0,0 +1,80 @@ +use libdd_common::tag::Tag; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +/// The `DogStatsDActionOwned` enum gathers the metric types that can be sent to the DogStatsD +/// server. This type takes ownership of the relevant data to support the sidecar better. +/// For documentation on the dogstatsd metric types: https://docs.datadoghq.com/metrics/types/?tab=count#metric-types +/// +/// Originally I attempted to combine this type with `DogStatsDAction` but this GREATLY complicates +/// the types to the point of insanity. I was unable to come up with a satisfactory approach that +/// allows both the data-pipeline and sidecar crates to use the same type. If a future rustacean +/// wants to take a stab and open a PR please do so! +#[derive(Debug, Serialize, Deserialize)] +pub enum DogStatsDActionOwned { + #[allow(missing_docs)] + Count(String, i64, Vec), + #[allow(missing_docs)] + Distribution(String, f64, Vec), + #[allow(missing_docs)] + Gauge(String, f64, Vec), + #[allow(missing_docs)] + Histogram(String, f64, Vec), + /// Cadence only support i64 type as value + /// but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230) + /// and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251) + Set(String, i64, Vec), +} + +/// The `DogStatsDAction` enum gathers the metric types that can be sent to the DogStatsD server. +#[derive(Debug, Serialize, Deserialize)] +pub enum DogStatsDAction<'a, T: AsRef, V: IntoIterator> { + // TODO: instead of AsRef we can accept a marker Trait that users of this crate implement + #[allow(missing_docs)] + Count(T, i64, V), + #[allow(missing_docs)] + Distribution(T, f64, V), + #[allow(missing_docs)] + Gauge(T, f64, V), + #[allow(missing_docs)] + Histogram(T, f64, V), + /// Cadence only support i64 type as value + /// but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230) + /// and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251) + Set(T, i64, V), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_owned_sync() { + // This test ensures that if a new variant is added to either `DogStatsDActionOwned` or + // `DogStatsDAction` this test will NOT COMPILE to act as a reminder that BOTH locations + // must be updated. + let owned_act = DogStatsDActionOwned::Count("test".to_string(), 1, vec![]); + match owned_act { + DogStatsDActionOwned::Count(_, _, _) => {} + DogStatsDActionOwned::Distribution(_, _, _) => {} + DogStatsDActionOwned::Gauge(_, _, _) => {} + DogStatsDActionOwned::Histogram(_, _, _) => {} + DogStatsDActionOwned::Set(_, _, _) => {} + } + + let act = DogStatsDAction::Count("test".to_string(), 1, vec![]); + match act { + DogStatsDAction::Count(_, _, _) => {} + DogStatsDAction::Distribution(_, _, _) => {} + DogStatsDAction::Gauge(_, _, _) => {} + DogStatsDAction::Histogram(_, _, _) => {} + DogStatsDAction::Set(_, _, _) => {} + } + // TODO: when std::mem::variant_count is in stable we can do this instead + // assert_eq!( + // std::mem::variant_count::(), + // std::mem::variant_count::>>(), + // "DogStatsDActionOwned and DogStatsDAction should have the same number of variants, + // did you forget to update one?", ); + } +} diff --git a/libdd-dogstatsd-client/src/client/mod.rs b/libdd-dogstatsd-client/src/client/mod.rs new file mode 100644 index 0000000000..33a344ca35 --- /dev/null +++ b/libdd-dogstatsd-client/src/client/mod.rs @@ -0,0 +1,324 @@ +use anyhow::anyhow; +use cadence::prelude::*; +use cadence::{Metric, MetricBuilder, QueuingMetricSink, StatsdClient}; +use libdd_common::tag::Tag; +use libdd_common::Endpoint; +#[cfg(feature = "shared-runtime")] +use libdd_shared_runtime::{SharedRuntime, WorkerHandle}; +use std::fmt::Debug; +use std::sync::{Arc, Mutex}; +use tracing::error; + +use crate::action::{DogStatsDAction, DogStatsDActionOwned}; + +// Queue with a maximum capacity of 32K elements +const QUEUE_SIZE: usize = 32 * 1024; + +mod sink; + +#[cfg(feature = "shared-runtime")] +mod shared_runtime_sink; + +/// A dogstatsd-client that flushes stats to a given endpoint. +#[derive(Debug, Default)] +pub struct Client { + pub(crate) client: Mutex>>, + pub(crate) endpoint: Option, +} + +/// Build a new flusher instance pointed at the provided endpoint. +/// Returns error if the provided endpoint is not valid. +pub fn new(endpoint: Endpoint) -> anyhow::Result { + Ok(Client::new(endpoint)) +} + +impl Client { + /// Build a new flusher instance pointed at the provided endpoint. + /// The sink and underlying thread are initialized lazily when the first metric is sent. + pub fn new(endpoint: Endpoint) -> Client { + // defer initialization of the client until the first metric is sent and we definitely know the + // client is going to be used to communicate with the endpoint. + Client { + endpoint: Some(endpoint), + ..Default::default() + } + } + + /// Create a [`Client`] backed by a [`MetricSinkWorker`] running on the + /// provided [`SharedRuntime`]. + /// + /// Returns the client and a [`WorkerHandle`] that can be used to stop the + /// worker independently of the runtime. + /// + /// # Errors + /// Returns an error if the endpoint is invalid or the worker cannot be spawned. + #[cfg(feature = "shared-runtime")] + pub fn new_with_shared_runtime( + endpoint: Endpoint, + runtime: &SharedRuntime, + ) -> anyhow::Result<(Client, WorkerHandle)> { + let (sink, handle) = shared_runtime_sink::create_shared_runtime_sink(&endpoint, runtime)?; + + let statsd_client = StatsdClient::from_sink("", sink); + + let client = Client { + client: Mutex::new(Arc::new(Some(statsd_client))), + endpoint: None, + }; + + Ok((client, handle)) + } + + /// Send a vector of DogStatsDActionOwned, this is the same as `send` except it uses the "owned" + /// version of DogStatsDAction. See the docs for DogStatsDActionOwned for details. + pub fn send_owned(&self, actions: Vec) { + let client_opt = match self.get_or_init_client() { + Ok(client) => client, + Err(e) => { + error!(?e, "Failed to get client"); + return; + } + }; + + if let Some(client) = &*client_opt { + for action in actions { + if let Err(err) = match action { + DogStatsDActionOwned::Count(metric, value, tags) => { + do_send(client.count_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Distribution(metric, value, tags) => { + do_send(client.distribution_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Gauge(metric, value, tags) => { + do_send(client.gauge_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Histogram(metric, value, tags) => { + do_send(client.histogram_with_tags(metric.as_ref(), value), &tags) + } + DogStatsDActionOwned::Set(metric, value, tags) => { + do_send(client.set_with_tags(metric.as_ref(), value), &tags) + } + } { + error!(?err, "Error while sending metric"); + } + } + } + } + + /// Send a vector of DogStatsDAction, this is the same as `send_owned` except it only borrows + /// the provided values.See the docs for DogStatsDActionOwned for details. + pub fn send<'a, T: AsRef, V: IntoIterator>( + &self, + actions: Vec>, + ) { + let client_opt = match self.get_or_init_client() { + Ok(client) => client, + Err(e) => { + error!(?e, "Failed to get client"); + return; + } + }; + if let Some(client) = &*client_opt { + for action in actions { + if let Err(err) = match action { + DogStatsDAction::Count(metric, value, tags) => { + let metric_builder = client.count_with_tags(metric.as_ref(), value); + do_send(metric_builder, tags) + } + DogStatsDAction::Distribution(metric, value, tags) => { + do_send(client.distribution_with_tags(metric.as_ref(), value), tags) + } + DogStatsDAction::Gauge(metric, value, tags) => { + do_send(client.gauge_with_tags(metric.as_ref(), value), tags) + } + DogStatsDAction::Histogram(metric, value, tags) => { + do_send(client.histogram_with_tags(metric.as_ref(), value), tags) + } + DogStatsDAction::Set(metric, value, tags) => { + do_send(client.set_with_tags(metric.as_ref(), value), tags) + } + } { + error!(?err, "Error while sending metric"); + } + } + } + } + + fn get_or_init_client(&self) -> anyhow::Result>> { + if let Some(endpoint) = &self.endpoint { + let mut client_guard = self + .client + .lock() + .map_err(|e| anyhow!("Failed to acquire dogstatsd client lock: {e}"))?; + return if client_guard.is_some() { + Ok(client_guard.clone()) + } else { + let client = Arc::new(Some(create_client(endpoint)?)); + *client_guard = client.clone(); + Ok(client) + }; + } + + Ok(None.into()) + } +} + +fn do_send<'m, 't, T, V: IntoIterator>( + mut builder: MetricBuilder<'m, '_, T>, + tags: V, +) -> anyhow::Result<()> +where + T: Metric + From, + 't: 'm, +{ + let mut tags_iter = tags.into_iter(); + let mut tag_opt = tags_iter.next(); + #[allow(clippy::unwrap_used)] + while tag_opt.is_some() { + builder = builder.with_tag_value(tag_opt.unwrap().as_ref()); + tag_opt = tags_iter.next(); + } + builder.try_send()?; + Ok(()) +} + +fn create_client(endpoint: &Endpoint) -> anyhow::Result { + let sink = match endpoint.url.scheme_str() { + Some("unix") => { + QueuingMetricSink::with_capacity(sink::create_unix_sink(endpoint)?, QUEUE_SIZE) + } + _ => QueuingMetricSink::with_capacity(sink::create_udp_sink(endpoint)?, QUEUE_SIZE), + }; + Ok(StatsdClient::from_sink("", sink)) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::action::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set}; + #[cfg(unix)] + use http::Uri; + #[cfg(unix)] + use libdd_common::connector::uds::socket_path_to_uri; + use libdd_common::{tag, Endpoint}; + use std::net; + use std::sync::Arc; + use std::time::Duration; + + #[test] + #[cfg_attr(miri, ignore)] + fn test_flusher() { + let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); + let _ = socket.set_read_timeout(Some(Duration::from_millis(500))); + + let flusher = new(Endpoint::from_slice( + socket.local_addr().unwrap().to_string().as_str(), + )) + .unwrap(); + flusher.send(vec![ + Count("test_count", 3, &vec![tag!("foo", "bar")]), + Count("test_neg_count", -2, &vec![]), + Distribution("test_distribution", 4.2, &vec![]), + Gauge("test_gauge", 7.6, &vec![]), + Histogram("test_histogram", 8.0, &vec![]), + Set("test_set", 9, &vec![tag!("the", "end")]), + Set("test_neg_set", -1, &vec![]), + ]); + + fn read(socket: &net::UdpSocket) -> String { + let mut buf = [0; 100]; + socket.recv(&mut buf).expect("No data"); + let datagram = String::from_utf8_lossy(buf.strip_suffix(&[0]).unwrap()); + datagram.trim_matches(char::from(0)).to_string() + } + + assert_eq!("test_count:3|c|#foo:bar", read(&socket)); + assert_eq!("test_neg_count:-2|c", read(&socket)); + assert_eq!("test_distribution:4.2|d", read(&socket)); + assert_eq!("test_gauge:7.6|g", read(&socket)); + assert_eq!("test_histogram:8|h", read(&socket)); + assert_eq!("test_set:9|s|#the:end", read(&socket)); + assert_eq!("test_neg_set:-1|s", read(&socket)); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_create_client_udp() { + let res = create_client(&Endpoint::default()); + assert!(res.is_err()); + assert_eq!("invalid host", res.unwrap_err().to_string().as_str()); + + let res = create_client(&Endpoint::from_slice("localhost:99999")); + assert!(res.is_err()); + assert_eq!("invalid port", res.unwrap_err().to_string().as_str()); + + let res = create_client(&Endpoint::from_slice("localhost:80")); + assert!(res.is_ok()); + + let res = create_client(&Endpoint::from_slice("http://localhost:80")); + assert!(res.is_ok()); + } + + #[test] + #[cfg(unix)] + #[cfg_attr(miri, ignore)] + fn test_create_client_unix_domain_socket() { + let res = create_client(&Endpoint::from_url( + "unix://localhost:80".parse::().unwrap(), + )); + assert!(res.is_err()); + assert_eq!( + "failed to build socket path from uri: invalid url", + res.unwrap_err().to_string().as_str() + ); + + let res = create_client(&Endpoint::from_url( + socket_path_to_uri("/path/to/a/socket.sock".as_ref()).unwrap(), + )); + assert!(res.is_ok()); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_thread_safety() { + let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); + let _ = socket.set_read_timeout(Some(Duration::from_millis(500))); + let endpoint = Endpoint::from_slice(socket.local_addr().unwrap().to_string().as_str()); + let flusher = Arc::new(new(endpoint.clone()).unwrap()); + + { + let client = flusher + .client + .lock() + .expect("failed to obtain lock on client"); + assert!(client.is_none()); + } + + let tasks: Vec<_> = (0..10) + .map(|_| { + let flusher_clone = Arc::clone(&flusher); + tokio::spawn(async move { + flusher_clone.send(vec![ + Count("test_count", 3, &vec![tag!("foo", "bar")]), + Count("test_neg_count", -2, &vec![]), + Distribution("test_distribution", 4.2, &vec![]), + Gauge("test_gauge", 7.6, &vec![]), + Histogram("test_histogram", 8.0, &vec![]), + Set("test_set", 9, &vec![tag!("the", "end")]), + Set("test_neg_set", -1, &vec![]), + ]); + + let client = flusher_clone + .client + .lock() + .expect("failed to obtain lock on client within send thread"); + assert!(client.is_some()); + }) + }) + .collect(); + + for task in tasks { + task.await.unwrap(); + } + } +} diff --git a/libdd-dogstatsd-client/src/client/shared_runtime_sink.rs b/libdd-dogstatsd-client/src/client/shared_runtime_sink.rs new file mode 100644 index 0000000000..e705c7b84b --- /dev/null +++ b/libdd-dogstatsd-client/src/client/shared_runtime_sink.rs @@ -0,0 +1,151 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use async_trait::async_trait; +use cadence::{MetricSink, SinkStats}; +use libdd_common::Endpoint; +use libdd_shared_runtime::{worker::Worker, SharedRuntime, WorkerHandle}; +use std::fmt; +use std::io; +use std::panic::RefUnwindSafe; +use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::error; + +use super::{sink, QUEUE_SIZE}; + +/// A [`MetricSink`] that offloads sent metrics to a [`SharedRuntime`]. +#[derive(Clone)] +pub struct SharedRuntimeMetricSink { + sender: mpsc::Sender, + sink: Arc, +} + +impl fmt::Debug for SharedRuntimeMetricSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SharedRuntimeMetricSink {{ {:?} }}", self.sender) + } +} + +impl MetricSink for SharedRuntimeMetricSink { + fn emit(&self, metric: &str) -> io::Result { + let len = metric.len(); + self.sender + .try_send(metric.to_owned()) + .map(|_| len) + .map_err(|e| match e { + mpsc::error::TrySendError::Full(_) => { + io::Error::new(io::ErrorKind::WouldBlock, "dogstatsd channel full") + } + mpsc::error::TrySendError::Closed(_) => { + io::Error::new(io::ErrorKind::BrokenPipe, "dogstatsd channel closed") + } + }) + } + + fn flush(&self) -> Result<(), std::io::Error> { + self.sink.flush() + } + + fn stats(&self) -> SinkStats { + self.sink.stats() + } +} + +/// A [`Worker`] that drains metrics from a channel and forwards them to a +/// wrapped [`MetricSink`] (e.g. `UdpMetricSink`). +pub struct MetricSinkWorker { + receiver: mpsc::Receiver, + sink: Arc, + pending: Option, +} + +impl std::fmt::Debug for MetricSinkWorker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MetricSinkWorker") + .field("pending", &self.pending) + .finish_non_exhaustive() + } +} + +#[async_trait] +impl Worker for MetricSinkWorker { + /// Awaits the next metric from the channel, storing it in `pending`. + /// + /// If the channel is closed this future never resolves, allowing the + /// SharedRuntime to cancel it cleanly at the next yield point. + async fn trigger(&mut self) { + match self.receiver.recv().await { + Some(metric) => self.pending = Some(metric), + None => { + // Channel closed — park until cancelled by the runtime. + std::future::pending::<()>().await; + } + } + } + + /// Forwards the single metric stored by `trigger` to the wrapped sink. + async fn run(&mut self) { + if let Some(metric) = self.pending.take() { + if let Err(e) = self.sink.emit(&metric) { + error!(?e, "MetricSinkWorker: failed to emit metric"); + } + } + } + + /// Drains any remaining queued metrics and flushes the wrapped sink. + async fn shutdown(&mut self) { + // Forward the metric that was sitting in `pending`, if any. + if let Some(metric) = self.pending.take() { + if let Err(e) = self.sink.emit(&metric) { + error!(?e, "MetricSinkWorker: failed to emit metric on shutdown"); + } + } + // Drain the channel. + while let Ok(metric) = self.receiver.try_recv() { + if let Err(e) = self.sink.emit(&metric) { + error!(?e, "MetricSinkWorker: failed to emit metric on shutdown"); + } + } + if let Err(e) = self.sink.flush() { + error!(?e, "MetricSinkWorker: failed to flush sink on shutdown"); + } + } + + /// Reset the worker in the child process after a fork. + fn reset(&mut self) { + self.pending = None; + // Drain the channel + while self.receiver.try_recv().is_ok() {} + } +} + +pub fn create_shared_runtime_sink( + endpoint: &Endpoint, + runtime: &SharedRuntime, +) -> anyhow::Result<(SharedRuntimeMetricSink, WorkerHandle)> { + let (tx, rx) = mpsc::channel(QUEUE_SIZE); + + let sink: Arc = match endpoint.url.scheme_str() { + Some("unix") => Arc::new(sink::create_unix_sink(&endpoint)?), + _ => Arc::new(sink::create_udp_sink(&endpoint)?), + }; + + let sink_worker = MetricSinkWorker { + receiver: rx, + sink: sink.clone(), + pending: None, + }; + + let handle = runtime + .spawn_worker(sink_worker, true) + .map_err(|e| anyhow!("failed to spawn MetricSinkWorker: {e}"))?; + + let shared_sink = SharedRuntimeMetricSink { + sender: tx, + sink: sink, + }; + + Ok((shared_sink, handle)) +} diff --git a/libdd-dogstatsd-client/src/client/sink.rs b/libdd-dogstatsd-client/src/client/sink.rs new file mode 100644 index 0000000000..d41311f9f9 --- /dev/null +++ b/libdd-dogstatsd-client/src/client/sink.rs @@ -0,0 +1,43 @@ +use libdd_common::Endpoint; + +use anyhow::anyhow; +use cadence::UdpMetricSink; +use cadence::UnixMetricSink; +#[cfg(unix)] +use libdd_common::connector::uds::socket_path_from_uri; +use std::net::{ToSocketAddrs, UdpSocket}; +#[cfg(unix)] +use std::os::unix::net::UnixDatagram; + +pub(crate) fn create_unix_sink(endpoint: &Endpoint) -> anyhow::Result { + let socket = + UnixDatagram::unbound().map_err(|e| anyhow!("failed to make unbound unix port: {}", e))?; + socket + .set_nonblocking(true) + .map_err(|e| anyhow!("failed to set socket to nonblocking: {}", e))?; + Ok(UnixMetricSink::from( + socket_path_from_uri(&endpoint.url) + .map_err(|e| anyhow!("failed to build socket path from uri: {}", e))?, + socket, + )) +} + +pub(crate) fn create_udp_sink(endpoint: &Endpoint) -> anyhow::Result { + let host = endpoint.url.host().ok_or(anyhow!("invalid host"))?; + let port = endpoint.url.port().ok_or(anyhow!("invalid port"))?.as_u16(); + + let server_address = (host, port) + .to_socket_addrs()? + .next() + .ok_or(anyhow!("invalid address"))?; + + let socket = if server_address.is_ipv4() { + UdpSocket::bind("0.0.0.0:0").map_err(|e| anyhow!("failed to bind to 0.0.0.0:0: {}", e))? + } else { + UdpSocket::bind("[::]:0").map_err(|e| anyhow!("failed to bind to [::]:0: {}", e))? + }; + socket.set_nonblocking(true)?; + + Ok(UdpMetricSink::from((host, port), socket) + .map_err(|e| anyhow!("failed to build UdpMetricSink: {}", e))?) +} diff --git a/libdd-dogstatsd-client/src/lib.rs b/libdd-dogstatsd-client/src/lib.rs index f41d9e837a..e047776f89 100644 --- a/libdd-dogstatsd-client/src/lib.rs +++ b/libdd-dogstatsd-client/src/lib.rs @@ -10,406 +10,10 @@ //! dogstatsd-client implements a client to emit metrics to a dogstatsd server. //! This is made use of in at least the data-pipeline and sidecar crates. -use libdd_common::tag::Tag; -use libdd_common::Endpoint; -use serde::{Deserialize, Serialize}; -use std::fmt::Debug; -use tracing::error; +/// Types representing dogstatsd actions +mod action; +pub use action::{DogStatsDAction, DogStatsDActionOwned}; -use anyhow::anyhow; -use cadence::prelude::*; -#[cfg(unix)] -use cadence::UnixMetricSink; -use cadence::{Metric, MetricBuilder, QueuingMetricSink, StatsdClient, UdpMetricSink}; -#[cfg(unix)] -use libdd_common::connector::uds::socket_path_from_uri; -use std::net::{ToSocketAddrs, UdpSocket}; -#[cfg(unix)] -use std::os::unix::net::UnixDatagram; -use std::sync::{Arc, Mutex}; - -// Queue with a maximum capacity of 32K elements -const QUEUE_SIZE: usize = 32 * 1024; - -/// The `DogStatsDActionOwned` enum gathers the metric types that can be sent to the DogStatsD -/// server. This type takes ownership of the relevant data to support the sidecar better. -/// For documentation on the dogstatsd metric types: https://docs.datadoghq.com/metrics/types/?tab=count#metric-types -/// -/// Originally I attempted to combine this type with `DogStatsDAction` but this GREATLY complicates -/// the types to the point of insanity. I was unable to come up with a satisfactory approach that -/// allows both the data-pipeline and sidecar crates to use the same type. If a future rustacean -/// wants to take a stab and open a PR please do so! -#[derive(Debug, Serialize, Deserialize)] -pub enum DogStatsDActionOwned { - #[allow(missing_docs)] - Count(String, i64, Vec), - #[allow(missing_docs)] - Distribution(String, f64, Vec), - #[allow(missing_docs)] - Gauge(String, f64, Vec), - #[allow(missing_docs)] - Histogram(String, f64, Vec), - /// Cadence only support i64 type as value - /// but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230) - /// and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251) - Set(String, i64, Vec), -} - -/// The `DogStatsDAction` enum gathers the metric types that can be sent to the DogStatsD server. -#[derive(Debug, Serialize, Deserialize)] -pub enum DogStatsDAction<'a, T: AsRef, V: IntoIterator> { - // TODO: instead of AsRef we can accept a marker Trait that users of this crate implement - #[allow(missing_docs)] - Count(T, i64, V), - #[allow(missing_docs)] - Distribution(T, f64, V), - #[allow(missing_docs)] - Gauge(T, f64, V), - #[allow(missing_docs)] - Histogram(T, f64, V), - /// Cadence only support i64 type as value - /// but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230) - /// and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251) - Set(T, i64, V), -} - -/// A dogstatsd-client that flushes stats to a given endpoint. -#[derive(Debug, Default)] -pub struct Client { - client: Mutex>>, - endpoint: Option, -} - -/// Build a new flusher instance pointed at the provided endpoint. -/// Returns error if the provided endpoint is not valid. -pub fn new(endpoint: Endpoint) -> anyhow::Result { - // defer initialization of the client until the first metric is sent and we definitely know the - // client is going to be used to communicate with the endpoint. - Ok(Client { - endpoint: Some(endpoint), - ..Default::default() - }) -} - -impl Client { - /// Send a vector of DogStatsDActionOwned, this is the same as `send` except it uses the "owned" - /// version of DogStatsDAction. See the docs for DogStatsDActionOwned for details. - pub fn send_owned(&self, actions: Vec) { - let client_opt = match self.get_or_init_client() { - Ok(client) => client, - Err(e) => { - error!(?e, "Failed to get client"); - return; - } - }; - - if let Some(client) = &*client_opt { - for action in actions { - if let Err(err) = match action { - DogStatsDActionOwned::Count(metric, value, tags) => { - do_send(client.count_with_tags(metric.as_ref(), value), &tags) - } - DogStatsDActionOwned::Distribution(metric, value, tags) => { - do_send(client.distribution_with_tags(metric.as_ref(), value), &tags) - } - DogStatsDActionOwned::Gauge(metric, value, tags) => { - do_send(client.gauge_with_tags(metric.as_ref(), value), &tags) - } - DogStatsDActionOwned::Histogram(metric, value, tags) => { - do_send(client.histogram_with_tags(metric.as_ref(), value), &tags) - } - DogStatsDActionOwned::Set(metric, value, tags) => { - do_send(client.set_with_tags(metric.as_ref(), value), &tags) - } - } { - error!(?err, "Error while sending metric"); - } - } - } - } - - /// Send a vector of DogStatsDAction, this is the same as `send_owned` except it only borrows - /// the provided values.See the docs for DogStatsDActionOwned for details. - pub fn send<'a, T: AsRef, V: IntoIterator>( - &self, - actions: Vec>, - ) { - let client_opt = match self.get_or_init_client() { - Ok(client) => client, - Err(e) => { - error!(?e, "Failed to get client"); - return; - } - }; - if let Some(client) = &*client_opt { - for action in actions { - if let Err(err) = match action { - DogStatsDAction::Count(metric, value, tags) => { - let metric_builder = client.count_with_tags(metric.as_ref(), value); - do_send(metric_builder, tags) - } - DogStatsDAction::Distribution(metric, value, tags) => { - do_send(client.distribution_with_tags(metric.as_ref(), value), tags) - } - DogStatsDAction::Gauge(metric, value, tags) => { - do_send(client.gauge_with_tags(metric.as_ref(), value), tags) - } - DogStatsDAction::Histogram(metric, value, tags) => { - do_send(client.histogram_with_tags(metric.as_ref(), value), tags) - } - DogStatsDAction::Set(metric, value, tags) => { - do_send(client.set_with_tags(metric.as_ref(), value), tags) - } - } { - error!(?err, "Error while sending metric"); - } - } - } - } - - fn get_or_init_client(&self) -> anyhow::Result>> { - if let Some(endpoint) = &self.endpoint { - let mut client_guard = self - .client - .lock() - .map_err(|e| anyhow!("Failed to acquire dogstatsd client lock: {e}"))?; - return if client_guard.is_some() { - Ok(client_guard.clone()) - } else { - let client = Arc::new(Some(create_client(endpoint)?)); - *client_guard = client.clone(); - Ok(client) - }; - } - - Ok(None.into()) - } -} - -fn do_send<'m, 't, T, V: IntoIterator>( - mut builder: MetricBuilder<'m, '_, T>, - tags: V, -) -> anyhow::Result<()> -where - T: Metric + From, - 't: 'm, -{ - let mut tags_iter = tags.into_iter(); - let mut tag_opt = tags_iter.next(); - #[allow(clippy::unwrap_used)] - while tag_opt.is_some() { - builder = builder.with_tag_value(tag_opt.unwrap().as_ref()); - tag_opt = tags_iter.next(); - } - builder.try_send()?; - Ok(()) -} - -fn create_client(endpoint: &Endpoint) -> anyhow::Result { - match endpoint.url.scheme_str() { - #[cfg(unix)] - Some("unix") => { - let socket = UnixDatagram::unbound() - .map_err(|e| anyhow!("failed to make unbound unix port: {}", e))?; - socket - .set_nonblocking(true) - .map_err(|e| anyhow!("failed to set socket to nonblocking: {}", e))?; - let sink = QueuingMetricSink::with_capacity( - UnixMetricSink::from( - socket_path_from_uri(&endpoint.url) - .map_err(|e| anyhow!("failed to build socket path from uri: {}", e))?, - socket, - ), - QUEUE_SIZE, - ); - - Ok(StatsdClient::from_sink("", sink)) - } - _ => { - let host = endpoint.url.host().ok_or(anyhow!("invalid host"))?; - let port = endpoint.url.port().ok_or(anyhow!("invalid port"))?.as_u16(); - - let server_address = (host, port) - .to_socket_addrs()? - .next() - .ok_or(anyhow!("invalid address"))?; - - let socket = if server_address.is_ipv4() { - UdpSocket::bind("0.0.0.0:0") - .map_err(|e| anyhow!("failed to bind to 0.0.0.0:0: {}", e))? - } else { - UdpSocket::bind("[::]:0").map_err(|e| anyhow!("failed to bind to [::]:0: {}", e))? - }; - socket.set_nonblocking(true)?; - - let sink = QueuingMetricSink::with_capacity( - UdpMetricSink::from((host, port), socket) - .map_err(|e| anyhow!("failed to build UdpMetricSink: {}", e))?, - QUEUE_SIZE, - ); - - Ok(StatsdClient::from_sink("", sink)) - } - } -} - -#[cfg(test)] -mod test { - use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set}; - use crate::{create_client, new, DogStatsDActionOwned}; - #[cfg(unix)] - use http::Uri; - #[cfg(unix)] - use libdd_common::connector::uds::socket_path_to_uri; - use libdd_common::{tag, Endpoint}; - use std::net; - use std::sync::Arc; - use std::time::Duration; - - #[test] - #[cfg_attr(miri, ignore)] - fn test_flusher() { - let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); - let _ = socket.set_read_timeout(Some(Duration::from_millis(500))); - - let flusher = new(Endpoint::from_slice( - socket.local_addr().unwrap().to_string().as_str(), - )) - .unwrap(); - flusher.send(vec![ - Count("test_count", 3, &vec![tag!("foo", "bar")]), - Count("test_neg_count", -2, &vec![]), - Distribution("test_distribution", 4.2, &vec![]), - Gauge("test_gauge", 7.6, &vec![]), - Histogram("test_histogram", 8.0, &vec![]), - Set("test_set", 9, &vec![tag!("the", "end")]), - Set("test_neg_set", -1, &vec![]), - ]); - - fn read(socket: &net::UdpSocket) -> String { - let mut buf = [0; 100]; - socket.recv(&mut buf).expect("No data"); - let datagram = String::from_utf8_lossy(buf.strip_suffix(&[0]).unwrap()); - datagram.trim_matches(char::from(0)).to_string() - } - - assert_eq!("test_count:3|c|#foo:bar", read(&socket)); - assert_eq!("test_neg_count:-2|c", read(&socket)); - assert_eq!("test_distribution:4.2|d", read(&socket)); - assert_eq!("test_gauge:7.6|g", read(&socket)); - assert_eq!("test_histogram:8|h", read(&socket)); - assert_eq!("test_set:9|s|#the:end", read(&socket)); - assert_eq!("test_neg_set:-1|s", read(&socket)); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn test_create_client_udp() { - let res = create_client(&Endpoint::default()); - assert!(res.is_err()); - assert_eq!("invalid host", res.unwrap_err().to_string().as_str()); - - let res = create_client(&Endpoint::from_slice("localhost:99999")); - assert!(res.is_err()); - assert_eq!("invalid port", res.unwrap_err().to_string().as_str()); - - let res = create_client(&Endpoint::from_slice("localhost:80")); - assert!(res.is_ok()); - - let res = create_client(&Endpoint::from_slice("http://localhost:80")); - assert!(res.is_ok()); - } - - #[test] - #[cfg(unix)] - #[cfg_attr(miri, ignore)] - fn test_create_client_unix_domain_socket() { - let res = create_client(&Endpoint::from_url( - "unix://localhost:80".parse::().unwrap(), - )); - assert!(res.is_err()); - assert_eq!( - "failed to build socket path from uri: invalid url", - res.unwrap_err().to_string().as_str() - ); - - let res = create_client(&Endpoint::from_url( - socket_path_to_uri("/path/to/a/socket.sock".as_ref()).unwrap(), - )); - assert!(res.is_ok()); - } - - #[test] - fn test_owned_sync() { - // This test ensures that if a new variant is added to either `DogStatsDActionOwned` or - // `DogStatsDAction` this test will NOT COMPILE to act as a reminder that BOTH locations - // must be updated. - let owned_act = DogStatsDActionOwned::Count("test".to_string(), 1, vec![]); - match owned_act { - DogStatsDActionOwned::Count(_, _, _) => {} - DogStatsDActionOwned::Distribution(_, _, _) => {} - DogStatsDActionOwned::Gauge(_, _, _) => {} - DogStatsDActionOwned::Histogram(_, _, _) => {} - DogStatsDActionOwned::Set(_, _, _) => {} - } - - let act = Count("test".to_string(), 1, vec![]); - match act { - Count(_, _, _) => {} - Distribution(_, _, _) => {} - Gauge(_, _, _) => {} - Histogram(_, _, _) => {} - Set(_, _, _) => {} - } - // TODO: when std::mem::variant_count is in stable we can do this instead - // assert_eq!( - // std::mem::variant_count::(), - // std::mem::variant_count::>>(), - // "DogStatsDActionOwned and DogStatsDAction should have the same number of variants, - // did you forget to update one?", ); - } - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_thread_safety() { - let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket"); - let _ = socket.set_read_timeout(Some(Duration::from_millis(500))); - let endpoint = Endpoint::from_slice(socket.local_addr().unwrap().to_string().as_str()); - let flusher = Arc::new(new(endpoint.clone()).unwrap()); - - { - let client = flusher - .client - .lock() - .expect("failed to obtain lock on client"); - assert!(client.is_none()); - } - - let tasks: Vec<_> = (0..10) - .map(|_| { - let flusher_clone = Arc::clone(&flusher); - tokio::spawn(async move { - flusher_clone.send(vec![ - Count("test_count", 3, &vec![tag!("foo", "bar")]), - Count("test_neg_count", -2, &vec![]), - Distribution("test_distribution", 4.2, &vec![]), - Gauge("test_gauge", 7.6, &vec![]), - Histogram("test_histogram", 8.0, &vec![]), - Set("test_set", 9, &vec![tag!("the", "end")]), - Set("test_neg_set", -1, &vec![]), - ]); - - let client = flusher_clone - .client - .lock() - .expect("failed to obtain lock on client within send thread"); - assert!(client.is_some()); - }) - }) - .collect(); - - for task in tasks { - task.await.unwrap(); - } - } -} +/// Dogstatsd client used to send metrics +mod client; +pub use client::{new, Client};