|
| 1 | +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +//! Anonymous sandbox network activity counter aggregation. |
| 5 | +
|
| 6 | +use std::collections::HashMap; |
| 7 | +use std::future::Future; |
| 8 | +use tokio::sync::mpsc; |
| 9 | +use tracing::debug; |
| 10 | + |
| 11 | +#[derive(Debug, Clone)] |
| 12 | +pub struct ActivityEvent { |
| 13 | + pub denied: bool, |
| 14 | + pub deny_group: &'static str, |
| 15 | +} |
| 16 | + |
| 17 | +#[derive(Debug, Clone, PartialEq, Eq)] |
| 18 | +pub struct FlushableActivitySummary { |
| 19 | + pub network_activity_count: u32, |
| 20 | + pub denied_action_count: u32, |
| 21 | + pub denials_by_group: Vec<(String, u32)>, |
| 22 | +} |
| 23 | + |
| 24 | +pub struct ActivityAggregator { |
| 25 | + rx: mpsc::UnboundedReceiver<ActivityEvent>, |
| 26 | + network_activity_count: u32, |
| 27 | + denied_action_count: u32, |
| 28 | + denials_by_group: HashMap<String, u32>, |
| 29 | + flush_interval_secs: u64, |
| 30 | +} |
| 31 | + |
| 32 | +impl ActivityAggregator { |
| 33 | + pub fn new(rx: mpsc::UnboundedReceiver<ActivityEvent>, flush_interval_secs: u64) -> Self { |
| 34 | + Self { |
| 35 | + rx, |
| 36 | + network_activity_count: 0, |
| 37 | + denied_action_count: 0, |
| 38 | + denials_by_group: HashMap::new(), |
| 39 | + flush_interval_secs, |
| 40 | + } |
| 41 | + } |
| 42 | + |
| 43 | + pub async fn run<F, Fut>(mut self, flush_callback: F) |
| 44 | + where |
| 45 | + F: Fn(FlushableActivitySummary) -> Fut, |
| 46 | + Fut: Future<Output = ()>, |
| 47 | + { |
| 48 | + let mut flush_interval = |
| 49 | + tokio::time::interval(std::time::Duration::from_secs(self.flush_interval_secs)); |
| 50 | + flush_interval.tick().await; |
| 51 | + |
| 52 | + loop { |
| 53 | + tokio::select! { |
| 54 | + event = self.rx.recv() => { |
| 55 | + if let Some(event) = event { |
| 56 | + self.ingest(event); |
| 57 | + } else { |
| 58 | + if let Some(summary) = self.drain() { |
| 59 | + flush_callback(summary).await; |
| 60 | + } |
| 61 | + debug!("ActivityAggregator: channel closed, exiting"); |
| 62 | + return; |
| 63 | + } |
| 64 | + } |
| 65 | + _ = flush_interval.tick() => { |
| 66 | + if let Some(summary) = self.drain() { |
| 67 | + debug!( |
| 68 | + count = summary.network_activity_count, |
| 69 | + denied = summary.denied_action_count, |
| 70 | + "ActivityAggregator: flushing anonymous activity summary" |
| 71 | + ); |
| 72 | + flush_callback(summary).await; |
| 73 | + } |
| 74 | + } |
| 75 | + } |
| 76 | + } |
| 77 | + } |
| 78 | + |
| 79 | + fn ingest(&mut self, event: ActivityEvent) { |
| 80 | + self.network_activity_count = self.network_activity_count.saturating_add(1); |
| 81 | + if event.denied { |
| 82 | + self.denied_action_count = self.denied_action_count.saturating_add(1); |
| 83 | + let group = sanitize_deny_group(event.deny_group).to_string(); |
| 84 | + let count = self.denials_by_group.entry(group).or_default(); |
| 85 | + *count = count.saturating_add(1); |
| 86 | + } |
| 87 | + } |
| 88 | + |
| 89 | + fn drain(&mut self) -> Option<FlushableActivitySummary> { |
| 90 | + if self.network_activity_count == 0 { |
| 91 | + return None; |
| 92 | + } |
| 93 | + let mut denials_by_group: Vec<(String, u32)> = self.denials_by_group.drain().collect(); |
| 94 | + denials_by_group.sort_by(|left, right| left.0.cmp(&right.0)); |
| 95 | + let summary = FlushableActivitySummary { |
| 96 | + network_activity_count: self.network_activity_count, |
| 97 | + denied_action_count: self.denied_action_count, |
| 98 | + denials_by_group, |
| 99 | + }; |
| 100 | + self.network_activity_count = 0; |
| 101 | + self.denied_action_count = 0; |
| 102 | + Some(summary) |
| 103 | + } |
| 104 | +} |
| 105 | + |
| 106 | +pub fn sanitize_deny_group(raw: &str) -> &'static str { |
| 107 | + match raw { |
| 108 | + "connect_policy" | "connect" | "l4_deny" => "connect_policy", |
| 109 | + "forward_policy" | "forward" => "forward_policy", |
| 110 | + "l7_policy" | "l7" | "l7_deny" | "forward-l7-deny" => "l7_policy", |
| 111 | + "l7_parse_rejection" | "parse_rejection" => "l7_parse_rejection", |
| 112 | + "ssrf" => "ssrf", |
| 113 | + "bypass" => "bypass", |
| 114 | + "policy_stale" => "policy_stale", |
| 115 | + _ => "unknown", |
| 116 | + } |
| 117 | +} |
| 118 | + |
| 119 | +#[cfg(test)] |
| 120 | +fn denial_rate_pct(network_activity_count: u32, denied_action_count: u32) -> f64 { |
| 121 | + if network_activity_count == 0 { |
| 122 | + return 0.0; |
| 123 | + } |
| 124 | + ((f64::from(denied_action_count) / f64::from(network_activity_count)) * 100.0).clamp(0.0, 100.0) |
| 125 | +} |
| 126 | + |
| 127 | +#[cfg(test)] |
| 128 | +mod tests { |
| 129 | + use super::*; |
| 130 | + |
| 131 | + fn assert_float_eq(actual: f64, expected: f64) { |
| 132 | + assert!((actual - expected).abs() <= f64::EPSILON); |
| 133 | + } |
| 134 | + |
| 135 | + #[test] |
| 136 | + fn deny_group_sanitization_uses_allowlist() { |
| 137 | + assert_eq!(sanitize_deny_group("connect"), "connect_policy"); |
| 138 | + assert_eq!(sanitize_deny_group("forward-l7-deny"), "l7_policy"); |
| 139 | + assert_eq!(sanitize_deny_group("host=example.test/path"), "unknown"); |
| 140 | + assert_eq!(sanitize_deny_group("acme.internal:443"), "unknown"); |
| 141 | + assert_eq!( |
| 142 | + sanitize_deny_group("binary=/usr/local/bin/private"), |
| 143 | + "unknown" |
| 144 | + ); |
| 145 | + } |
| 146 | + |
| 147 | + #[test] |
| 148 | + fn denial_rate_handles_zero_and_clamps() { |
| 149 | + assert_float_eq(denial_rate_pct(0, 10), 0.0); |
| 150 | + assert_float_eq(denial_rate_pct(4, 1), 25.0); |
| 151 | + assert_float_eq(denial_rate_pct(4, 10), 100.0); |
| 152 | + } |
| 153 | +} |
0 commit comments