feat(dogstatsd): Add DogStatsD packet forwarding support#1712
Conversation
This comment has been minimized.
This comment has been minimized.
Binary Size Analysis (Agent Data Plane)Baseline: 8c475ed · Comparison: 8c52510 · diff ✅ Binary size difference within thresholdChanges by Module
Detailed Symbol Changes |
Regression Detector (Agent Data Plane)Run ID: Optimization Goals: ✅ No significant changes detectedFine details of change detection per experiment (35)Experiments configured
Bounds Checks: ✅ Passed (5)
ExplanationA change is flagged as a regression when |Δ mean %| > 5.00% in the regressing direction for its optimization goal AND SMP marks the experiment as a regression ( |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: da487305bd
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8d41a26b2f
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let mut packet_forward_flush = interval(packet_forwarder.as_ref().map_or( | ||
| DEFAULT_DOGSTATSD_PACKET_BUFFER_FLUSH_TIMEOUT, | ||
| PacketForwarder::udp_batch_flush_timeout, | ||
| )); |
There was a problem hiding this comment.
Guard zero flush timeout before creating interval
dogstatsd_packet_buffer_flush_timeout is deserialized via DurationString (which accepts 0), but this value is passed directly into tokio::time::interval; Tokio panics when the interval period is zero. With forwarding enabled and dogstatsd_packet_buffer_flush_timeout: 0, the DogStatsD handler task will panic at startup, so forwarding and ingestion on that listener can stop unexpectedly instead of handling the config safely.
Useful? React with 👍 / 👎.
tobz
left a comment
There was a problem hiding this comment.
I left some specific feedback, but overall, I want to simplify this design: just forward the individual metrics packets (don't differentiate between the outer UDS frame and inner newline-delimited frames, and remove the code that captures/holds on to outer frames in NestedFramer) and avoid all of the "packet buffer" stuff.
My reasoning here is:
- we're still sending all of the same metrics in the end, but we skip needing to handle nested vs unnested frames
- the packet buffer infrastructure on the Agent side is really a consequence of the ingest -> worker model, so modeling it here isn't something we really need to do
- additionally, modeling the packet buffer infrastructure means time-based non-determinism (due to the flushing support) which is harder to test for correctness than if all we do if blast out the individual metric packets in real-time
| /// | ||
| /// Defaults to 0. | ||
| #[serde(rename = "statsd_forward_port", default = "default_statsd_forward_port")] | ||
| statsd_forward_port: i64, |
There was a problem hiding this comment.
We should make this u16. That's the only thing a valid port can be.
| warn!( | ||
| port = config.statsd_forward_port, | ||
| min = MIN_STATSD_FORWARD_PORT, | ||
| max = MAX_STATSD_FORWARD_PORT, | ||
| "Invalid statsd forward port. Packet forwarding disabled." | ||
| ); | ||
| return None; |
There was a problem hiding this comment.
Is this how it works in the Core Agent? If the port is invalid, it just ignores it and packet forwarding is disabled?
| async fn connect(&self) { | ||
| let host = &self.target_host; | ||
| let port = self.target_port; | ||
| let targets = match timeout(FORWARDER_RESOLVE_TIMEOUT, resolve_forward_targets(host, port)).await { |
There was a problem hiding this comment.
We don't actually need to do this separately. tokio::net::UdpSocket::connect already takes the address parameter as anything that implements ToSocketAddrs, which includes (&str, u16).
Using that instead (still coupled with the timeout, though) lets us get rid of a ton of the code here.
Summary
Source
statsd_forward_hostandstatsd_forward_port.statsd_forward_hostandstatsd_forward_portto supported DogStatsD config keys.dogstatsd_packet_buffer_sizeanddogstatsd_packet_buffer_flush_timeoutmarked not applicable for ADP because ADP decodes DogStatsD inline.Design Notes
This PR intentionally forwards decoded DogStatsD messages rather than preserving core Agent UDP packet-buffer grouping or UDS/TCP stream outer-frame grouping.
For example, a stream frame containing:
is forwarded as two UDP messages:
and:
This follows review feedback to avoid emulating the core Agent ingest -> packet-buffer -> worker pipeline in ADP. ADP still forwards the same DogStatsD messages, but it does not attempt byte-for-byte parity of forwarded UDP packet boundaries.
Testing
make fmtcargo test -p saluki-components sources::dogstatsd::tests --lib -- --nocapturecargo test -p saluki-components sources::dogstatsd::framer::tests --lib -- --nocapturecargo test -p saluki-components packet_forwarder --lib -- --nocapturecargo test -p saluki-io deser::framing::tests --lib -- --nocapturecargo test -p panoramic dogstatsd_forwarding --bin panoramic -- --nocapturecargo check --workspacecargo check --workspace --testsgit diff --checkCoverage
statsd_forward_portenable/disable behavior.