diff --git a/lib/opte-test-utils/src/port_state.rs b/lib/opte-test-utils/src/port_state.rs index cd33ea0c..cbe3a1fe 100644 --- a/lib/opte-test-utils/src/port_state.rs +++ b/lib/opte-test-utils/src/port_state.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2025 Oxide Computer Company +// Copyright 2026 Oxide Computer Company //! Routines for verifying various Port state. @@ -314,8 +314,10 @@ macro_rules! decr_na { /// assert the port state. #[macro_export] macro_rules! decr { - ($pav:expr, $fields:expr) => { - decr_na!($pav, $fields); + ($pav:expr, $fields_slice:expr) => { + for fields_str in &$fields_slice { + decr_na!($pav, fields_str); + } assert_port!($pav); }; } diff --git a/lib/opte/src/ddi/time.rs b/lib/opte/src/ddi/time.rs index ac996829..27fc1b1a 100644 --- a/lib/opte/src/ddi/time.rs +++ b/lib/opte/src/ddi/time.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2025 Oxide Computer Company +// Copyright 2026 Oxide Computer Company //! Moments, periodics, etc. use core::ops::Add; @@ -27,7 +27,7 @@ pub const NANOS: u64 = 1_000_000_000; pub const NANOS_TO_MILLIS: u64 = 1_000_000; /// A moment in time. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] pub struct Moment { #[cfg(all(not(feature = "std"), not(test)))] inner: ddi::hrtime_t, diff --git a/lib/opte/src/engine/flow_table.rs b/lib/opte/src/engine/flow_table.rs index cbbb7d42..fdc21de4 100644 --- a/lib/opte/src/engine/flow_table.rs +++ b/lib/opte/src/engine/flow_table.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2025 Oxide Computer Company +// Copyright 2026 Oxide Computer Company //! The flow table implementation. //! @@ -10,15 +10,18 @@ //! tables: UFT, LFT, and the TCP Flow Table. use super::packet::InnerFlowId; +use crate::ddi::sync::KRwLock; use crate::ddi::time::MILLIS; use crate::ddi::time::Moment; -use alloc::boxed::Box; use alloc::collections::BTreeMap; +use alloc::collections::BTreeSet; use alloc::ffi::CString; use alloc::string::String; use alloc::sync::Arc; +use alloc::sync::Weak; use alloc::vec::Vec; use core::fmt; +use core::num::NonZeroU16; use core::num::NonZeroU32; use core::sync::atomic::AtomicBool; use core::sync::atomic::AtomicU64; @@ -63,52 +66,167 @@ impl Ttl { } } +/// A metric of how stale a flow entry is, used to determine whether +/// any existing entry can be evicted to make room for a new one. +#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Default)] +pub enum EvictionPriority { + /// The flow is not eligible for eviction. + #[default] + Protected, + /// The flow entry may be evicted to make room for a new one. + /// + /// A numerically larger priority is more eligible for eviction. + Evictable(NonZeroU16), +} + /// A policy for expiring flow table entries over time. -pub trait ExpiryPolicy: fmt::Debug + Send + Sync { +pub trait ExpiryPolicy: fmt::Debug + Send + Sync { /// Returns whether the given flow should be removed, given current flow /// state, the time a packet was last received, and the current time. fn is_expired(&self, entry: &FlowEntry, now: Moment) -> bool; + + /// Returns whether a given flow can be evicted in favour of a new one + /// prior to its expiry time. + /// + /// If so, this function will return `Some(priority)` -- a higher priority + /// is more eligible to be evicted. + fn eviction_priority( + &self, + entry: &FlowEntry, + now: Moment, + ) -> Option; } -impl ExpiryPolicy for Ttl { +impl ExpiryPolicy for Ttl { fn is_expired(&self, entry: &FlowEntry, now: Moment) -> bool { - entry.is_expired(now, *self) + self.is_expired(entry.last_hit(), now) + } + + fn eviction_priority( + &self, + entry: &FlowEntry, + _now: Moment, + ) -> Option { + // TCP flows are expected to have a `TcpFlowEntryState` registered as + // a child or ancestor. If present, this will reduce the eviction + // priority to a level appropriate to how long the flow has been in its + // current state. Otherwise we should assume the flow has closed and + // that this entry is unneeded. + // + // Other flows in these tables have no additional context to suggest + // that they are lingering too long in a given state. + match entry.id().protocol() { + opte_api::Protocol::TCP => { + Some(EvictionPriority::Evictable(NonZeroU16::MAX)) + } + _ => None, + } + } +} + +/// Methods of a [`FlowEntry`] called on related flows across table boundaries. +/// +/// This is only intended to be implemented by [`FlowEntry`], but must be a +/// trait as the state type parameter `S: `[`FlowState`] differs on a table by +/// table basis. +pub trait FlowEntryInfo: fmt::Debug + Send + Sync { + /// Set the last hit time on this entry to `new_time` if it is later + /// than the stored value. + fn inherit_last_hit(&self, new_time: Moment); + + /// Determine whether this flow entry can be evicted to make room for + /// another, recursively checking all children when needed. + fn eviction_priority(&self, now: Moment) -> Option; + + /// Set `self` as a parent node to `child`. + fn push_child(&self, child: &Arc); + + /// Remove `child` from this entry's list of children. + fn remove_child(&self, child: &Arc); + + /// Mark this flow entry, and all those which depend on it for validity, + /// as being invalid. + fn mark_evicted(&self); +} + +impl FlowEntryInfo for FlowEntry { + fn inherit_last_hit(&self, new_time: Moment) { + let new = new_time.raw(); + // An error from the below call implies a concurrent modification with + // a time later than `new_time`. In that case there's just nothing left + // to do here. + _ = self.lifetime.last_hit.try_update( + Ordering::Relaxed, + Ordering::Relaxed, + |prior| (prior < new).then_some(new), + ); + } + + fn eviction_priority(&self, now: Moment) -> Option { + let own_prio = self.policy.eviction_priority(self, now); + + // An explicit signal that this flow is well-behaved wins outright. + if let Some(EvictionPriority::Protected) = own_prio { + return own_prio; + } + + // A priority of `None` tells us nothing, and will eventually resolve + // into `EvictionPriority::Protected` if no explicit priorities are + // provided. + // + // If we have an explicit priority, we keep the most-protected (lowest) + // priority which we know of. + let mut best_prio = own_prio; + for maybe_child in &*self.lifetime.children.read() { + if let Some(child) = maybe_child.0.upgrade() { + match (best_prio, child.eviction_priority(now)) { + (None, a) => best_prio = a, + (Some(_), None) => {} + (Some(old), Some(new)) => best_prio = Some(new.min(old)), + } + } + } + + best_prio + } + + fn push_child(&self, child: &Arc) { + let mut children = self.lifetime.children.write(); + children.insert(ByAddr(Arc::downgrade(child))); + } + + fn remove_child(&self, child: &Arc) { + let mut children = self.lifetime.children.write(); + children.remove(&ByAddr(Arc::downgrade(child))); + } + + fn mark_evicted(&self) { + if !self.lifetime.killed.swap(true, Ordering::Relaxed) { + // Any flow entry is only valid while all of its parents still + // exist. Timeout-driven expiry will not remove an entry while there + // are still live parents, but during eviction we need to go through + // and mark them as invalid in turn. + for maybe_child in &*self.lifetime.children.read() { + if let Some(child) = maybe_child.0.upgrade() { + child.mark_evicted(); + } + } + } } } pub type FlowTableDump = Vec<(InnerFlowId, T)>; #[derive(Debug)] -pub struct FlowTable { +pub struct FlowTable { port_c: CString, name_c: CString, limit: NonZeroU32, - policy: Box>, + policy: Arc>, map: BTreeMap>>, } -impl FlowTable -where - S: fmt::Debug + Dump, -{ - /// Add a new entry to the flow table. - /// - /// # Errors - /// - /// If the table is at max capacity, an error is returned and no - /// modification is made to the table. - /// - /// If an entry already exists for this flow, it is overwritten. - pub fn add(&mut self, flow_id: InnerFlowId, state: S) -> Result<()> { - if self.map.len() == self.limit.get() as usize { - return Err(OpteError::MaxCapacity(self.limit.get() as u64)); - } - - let entry = FlowEntry::new(state); - self.map.insert(flow_id, entry.into()); - Ok(()) - } - +impl FlowTable { /// Add a new entry to the flow table, returning a shared refrence to /// the entry. /// @@ -118,26 +236,45 @@ where /// modification is made to the table. /// /// If an entry already exists for this flow, it is overwritten. - pub fn add_and_return( + pub fn add( &mut self, flow_id: InnerFlowId, state: S, ) -> Result>> { - if self.map.len() == self.limit.get() as usize { - return Err(OpteError::MaxCapacity(self.limit.get() as u64)); - } - - let entry = Arc::new(FlowEntry::new(state)); - self.map.insert(flow_id, entry.clone()); + self.check_for_space()?; + let entry = Arc::new(FlowEntry::new(flow_id, state, self)); + self.map.insert(flow_id, Arc::clone(&entry)); Ok(entry) } /// Add a new entry to the flow table while eliding the capacity check. /// /// This is meant for table implementations that enforce their own limit. - pub fn add_unchecked(&mut self, flow_id: InnerFlowId, state: S) { - let entry = FlowEntry::new(state); - self.map.insert(flow_id, entry.into()); + pub fn add_unchecked( + &mut self, + flow_id: InnerFlowId, + state: S, + ) -> Arc> { + let entry = Arc::new(FlowEntry::new(flow_id, state, self)); + self.map.insert(flow_id, Arc::clone(&entry)); + entry + } + + /// Add a new entry to the flow table, assigning it the same lifetime as + /// an existing entry in another table. + /// + /// As in [`Self::add_unchecked`], this elides the capacity check. + pub fn add_unchecked_partner( + &mut self, + flow_id: InnerFlowId, + state: S, + partner: &FlowEntry, + ) -> Arc> { + let mut entry = FlowEntry::new(flow_id, state, self); + entry.lifetime = Arc::clone(&partner.lifetime); + let entry = Arc::new(entry); + self.map.insert(flow_id, Arc::clone(&entry)); + entry } // Clear all entries from the flow table. @@ -155,7 +292,10 @@ where pub fn expire(&mut self, flowid: &InnerFlowId) { flow_expired_probe(&self.port_c, &self.name_c, flowid, None, None); - self.map.remove(flowid); + if let Some(entry) = self.map.remove(flowid) { + entry.propagate_last_hit(); + entry.mark_evicted(); + } } pub fn expire_flows(&mut self, now: Moment, f: F) -> Vec @@ -167,33 +307,117 @@ where let mut expired = vec![]; self.map.retain(|flowid, entry| { - if self.policy.is_expired(entry, now) { + // A flow cannot be expired by the timer while it still has children + // relying upon its existence. Check whether any remain, and remove + // dangling references to child entries which have expired. + { + // We have a write lock on the port, so there shouldn't be + // contention here. + let mut children = entry.lifetime.children.write(); + children.retain(|el| el.0.upgrade().is_some()); + if !children.is_empty() { + return true; + } + } + if entry.is_expired(now) { + let my_time = entry.last_hit(); flow_expired_probe( port_c, name_c, flowid, - Some(entry.last_hit.load(Ordering::Relaxed)), + Some(my_time.raw_millis()), Some(now.raw_millis()), ); + entry.propagate_last_hit(); expired.push(f(entry.state())); return false; } - true + !entry.is_killed() }); expired } + /// Determine whether there is currently space for a new entry to be + /// inserted. + /// + /// If out of space, this method will attempt to evict an existing entry. + pub fn check_for_space(&mut self) -> Result<()> { + if self.map.len() < self.limit.get() as usize { + return Ok(()); + } + + if let Some((key, _)) = self.find_evictable_entry() { + self.expire(&key); + Ok(()) + } else { + Err(OpteError::MaxCapacity(self.limit.get() as u64)) + } + } + + /// Select the flow entry most eligible for eviction (i.e., having the + /// numerically highest priority and the oldest timestamp). + /// + /// Entries which have been killed due to the loss of a dependency will be + /// used where possible. + pub fn find_evictable_entry(&self) -> Option<(InnerFlowId, &FlowEntry)> { + let now = Moment::now(); + + // TODO: some form of datastructure to accelerate this? + // Who would be responsible for keeping that up to date? + // If that cache is wrong, we're just hitting the O(n) scan anyhow. + + let mut to_evict = None; + for (key, entry) in self.map.iter() { + if entry.is_killed() { + to_evict = Some((EvictionKey::Dead, *key, entry)); + break; + } + + // If we have no information, then default to preserving the flow. + let prio = entry.eviction_priority(now).unwrap_or_default(); + if let EvictionPriority::Protected = prio { + continue; + } + + let last_hit = entry.last_hit(); + + match to_evict { + None => { + to_evict = Some(( + EvictionKey::Evictable(prio, last_hit), + *key, + entry, + )) + } + Some((EvictionKey::Evictable(curr_prio, curr_time), ..)) + if prio > curr_prio + || (prio == curr_prio && last_hit < curr_time) => + { + to_evict = Some(( + EvictionKey::Evictable(prio, last_hit), + *key, + entry, + )); + } + Some(_) => {} + } + } + + to_evict.map(|(_, k, v)| (k, v.as_ref())) + } + /// Get the maximum number of entries this flow table may hold. pub fn get_limit(&self) -> NonZeroU32 { self.limit } - /// Get a reference to the flow entry for a given flow, if one - /// exists. + /// Get a reference to the flow entry for a given flow, if one exists. pub fn get(&self, flow_id: &InnerFlowId) -> Option<&Arc>> { - self.map.get(flow_id) + // Flows which are marked as `killed` no longer really exist, but they + // have not yet been reaped. + self.map.get(flow_id).and_then(|v| (!v.is_killed()).then_some(v)) } /// Mark all flow table entries as requiring revalidation after a @@ -211,9 +435,9 @@ where port: &str, name: &str, limit: NonZeroU32, - policy: Option>>, + policy: Option>>, ) -> FlowTable { - let policy = policy.unwrap_or_else(|| Box::new(FLOW_DEF_TTL)); + let policy = policy.unwrap_or_else(|| Arc::new(FLOW_DEF_TTL)); Self { port_c: CString::new(port).unwrap(), @@ -232,6 +456,12 @@ where pub fn remove(&mut self, flow: &InnerFlowId) -> Option>> { self.map.remove(flow) } + + pub fn iter( + &self, + ) -> impl Iterator>)> { + self.map.iter() + } } #[allow(unused_variables)] @@ -266,36 +496,104 @@ fn flow_expired_probe( /// A type that can be "dumped" for the purposes of presenting an /// external view into internal state of the [`FlowEntry`]. -pub trait Dump { +pub trait Dump: fmt::Debug + Send + Sync { type DumpVal: DeserializeOwned + Serialize; fn dump(&self, hits: u64) -> Self::DumpVal; } +/// Common functions needed from the interior state of a flow table entry. +pub trait FlowState: Dump { + /// Return an iterator containing references to all flow entries from other + /// tables which underpin `self`. + fn parents(&self) -> impl Iterator> { + [].into_iter() + } +} + +/// Lifecycle state for a flow entry or set of interlinked flow entries. +struct FlowLifetime { + /// This tracks the last time the flow was matched. + /// + /// These are raw u64s sourced from a `Moment`, which tracks time + /// in nanoseconds. + last_hit: AtomicU64, + + /// Whether this flow entry has been explicitly removed. + killed: AtomicBool, + + /// Entries in remote tables which rely on the continued existence of + /// this flow. + /// + /// Child entries can also provide a flow with a measure of whether + /// it is eligible for eviction. + children: KRwLock>, +} + +impl fmt::Debug for FlowLifetime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let FlowLifetime { last_hit, killed, children: _ } = self; + f.debug_struct("FlowEntry") + .field("last_hit", last_hit) + .field("killed", killed) + .field("children", &"") + .finish() + } +} + +/// Helper newtype to deduplicate child flow entries by address. +struct ByAddr(Weak); + +impl PartialEq for ByAddr { + fn eq(&self, other: &Self) -> bool { + core::ptr::addr_eq(self.0.as_ptr(), other.0.as_ptr()) + } +} + +impl Eq for ByAddr {} + +impl Ord for ByAddr { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + self.0.as_ptr().cast::<()>().cmp(&other.0.as_ptr().cast::<()>()) + } +} + +impl PartialOrd for ByAddr { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// The FlowEntry holds any arbitrary state type `S`. #[derive(Debug)] -pub struct FlowEntry { +pub struct FlowEntry { + /// The 5-tuple of this flow, used as the lookup key in the parent map. + id: InnerFlowId, + state: S, /// Number of times this flow has been matched. hits: AtomicU64, - /// This tracks the last time the flow was matched. - /// - /// These are raw u64s sourced from a `Moment`, which tracks time - /// in nanoseconds. - last_hit: AtomicU64, + /// State determining whether this flow can be expired or evicted. + lifetime: Arc, /// Records whether this flow predates a rule change, and /// must rerun rule processing before `state` can be used. dirty: AtomicBool, + + policy: Arc>, } -impl FlowEntry { +impl FlowEntry { fn dump(&self) -> S::DumpVal { self.state.dump(self.hits.load(Ordering::Relaxed)) } + pub fn id(&self) -> &InnerFlowId { + &self.id + } + pub fn state_mut(&mut self) -> &mut S { &mut self.state } @@ -325,7 +623,7 @@ impl FlowEntry { /// the port lock.** pub(crate) fn hit_at(&self, now: Moment) { self.hits.fetch_add(1, Ordering::Relaxed); - self.last_hit.store(now.raw(), Ordering::Relaxed); + self.lifetime.last_hit.store(now.raw(), Ordering::Relaxed); } pub fn is_dirty(&self) -> bool { @@ -341,19 +639,41 @@ impl FlowEntry { } pub fn last_hit(&self) -> Moment { - Moment::from_raw_nanos(self.last_hit.load(Ordering::Relaxed)) + Moment::from_raw_nanos(self.lifetime.last_hit.load(Ordering::Relaxed)) + } + + /// Returns whether this flow entry has explicitly been marked as invalid + /// (e.g., one of its ancestors has been evicted). + fn is_killed(&self) -> bool { + self.lifetime.killed.load(Ordering::Relaxed) + } + + /// Returns whether this flow entry is past its policy's expiry time. + fn is_expired(&self, now: Moment) -> bool { + self.policy.is_expired(self, now) } - fn is_expired(&self, now: Moment, ttl: Ttl) -> bool { - ttl.is_expired(self.last_hit(), now) + /// Update the last hit time of this flow entry's parents if it has been + /// used more recently. + fn propagate_last_hit(&self) { + let my_time = self.last_hit(); + for parent in self.state.parents() { + parent.inherit_last_hit(my_time); + } } - fn new(state: S) -> Self { + fn new(id: InnerFlowId, state: S, in_table: &FlowTable) -> Self { FlowEntry { + id, state, hits: 0.into(), - last_hit: Moment::now().raw().into(), dirty: false.into(), + policy: Arc::clone(&in_table.policy), + lifetime: Arc::new(FlowLifetime { + last_hit: Moment::now().raw().into(), + killed: false.into(), + children: KRwLock::new(BTreeSet::new()), + }), } } } @@ -381,10 +701,10 @@ unsafe extern "C" { ); } -impl Dump for () { - type DumpVal = (); - - fn dump(&self, _hits: u64) {} +/// A score of how likely we are to evict a given flow. +enum EvictionKey { + Dead, + Evictable(EvictionPriority, Moment), } #[cfg(test)] @@ -396,8 +716,53 @@ mod test { use crate::engine::packet::FLOW_ID_DEFAULT; use core::time::Duration; + impl Dump for () { + type DumpVal = (); + + fn dump(&self, _hits: u64) {} + } + + impl FlowState for () {} + + #[derive(Debug, Clone)] + struct ParentSet(Vec>); + + impl Dump for ParentSet { + type DumpVal = (); + + fn dump(&self, _hits: u64) {} + } + + impl FlowState for ParentSet { + fn parents(&self) -> impl Iterator> { + self.0.iter().cloned() + } + } + pub const FT_SIZE: Option = NonZeroU32::new(16); + #[derive(Debug, Clone)] + struct FixedPolicy { + time: Duration, + default: Option, + manual: BTreeMap, + } + + impl ExpiryPolicy for FixedPolicy { + fn is_expired(&self, entry: &FlowEntry, now: Moment) -> bool { + entry.last_hit().delta_as_millis(now) + >= (self.time.as_millis() as u64) + } + + fn eviction_priority( + &self, + entry: &FlowEntry, + _now: Moment, + ) -> Option { + self.manual.get(entry.id()).copied().or(self.default) + } + } + #[test] fn flow_expired() { let flowid = InnerFlowId { @@ -442,4 +807,310 @@ mod test { ft.clear(); assert_eq!(ft.num_flows(), 0); } + + #[test] + fn children_prevent_timer_expiry() { + let flowid = InnerFlowId { + proto: Protocol::TCP.into(), + addrs: AddrPair::V4 { + src: "192.168.2.10".parse().unwrap(), + dst: "76.76.21.21".parse().unwrap(), + }, + proto_info: PortInfo { src_port: 37890, dst_port: 443 }.into(), + }; + + let mut ft1 = + FlowTable::new("port", "parent-table", FT_SIZE.unwrap(), None); + let mut ft2 = + FlowTable::new("port", "child-table", FT_SIZE.unwrap(), None); + let fe1 = ft1.add(flowid, ()).unwrap(); + let fe2 = ft2.add(flowid, ()).unwrap(); + + let now = fe2.last_hit(); + fe1.push_child(&(fe2 as Arc)); + + // A flow entry cannot be removed by the timer until all its children + // have been evicted or expired. + let t2 = now + Duration::new(FLOW_DEF_EXPIRE_SECS, 0); + ft1.expire_flows(t2, |_| FLOW_ID_DEFAULT); + assert_eq!(ft1.num_flows(), 1); + + // If we go via ft2 first, we will be able to remove its entries (which + // have no children), which in turn makes ft1's entries available for + // eviction. + ft2.expire_flows(t2, |_| FLOW_ID_DEFAULT); + assert_eq!(ft2.num_flows(), 0); + ft1.expire_flows(t2, |_| FLOW_ID_DEFAULT); + assert_eq!(ft1.num_flows(), 0); + } + + #[test] + fn timer_expiry_propagation() { + let flowid = InnerFlowId { + proto: Protocol::TCP.into(), + addrs: AddrPair::V4 { + src: "192.168.2.10".parse().unwrap(), + dst: "76.76.21.21".parse().unwrap(), + }, + proto_info: PortInfo { src_port: 37890, dst_port: 443 }.into(), + }; + + let mut ft1 = + FlowTable::new("port", "parent-table", FT_SIZE.unwrap(), None); + let mut ft2 = + FlowTable::new("port", "child-table", FT_SIZE.unwrap(), None); + let fe1 = ft1.add(flowid, ()).unwrap(); + let fe2 = + ft2.add(flowid, ParentSet(vec![fe1.clone() as Arc<_>])).unwrap(); + + let t1 = fe2.last_hit(); + fe1.push_child(&(fe2.clone() as Arc)); + + // Updating the last-hit time of a flow won't immediately propagate to + // its children. + let t2 = t1 + Duration::from_secs(10); + fe2.hit_at(t2); + assert!(fe1.last_hit() <= t1); + + // When fe2 is expired, it should pass on its expiry time to fe1. + let t3 = t2 + Duration::new(FLOW_DEF_EXPIRE_SECS, 0); + ft2.expire_flows(t3, |_| FLOW_ID_DEFAULT); + assert_eq!(ft2.num_flows(), 0); + assert_eq!(fe1.last_hit(), t2); + } + + #[test] + fn eviction_basics() { + let flowid = InnerFlowId { + proto: Protocol::TCP.into(), + addrs: AddrPair::V4 { + src: "192.168.2.10".parse().unwrap(), + dst: "76.76.21.21".parse().unwrap(), + }, + proto_info: PortInfo { src_port: 37890, dst_port: 443 }.into(), + }; + + // Fill up the tables. + let mut default_ft = + FlowTable::new("port", "no-prio-table", FT_SIZE.unwrap(), None); + let mut evict_ft = FlowTable::new( + "port", + "prio-table", + FT_SIZE.unwrap(), + Some(Arc::new(FixedPolicy { + time: Duration::from_secs(FLOW_DEF_EXPIRE_SECS), + default: Some(EvictionPriority::Evictable(NonZeroU16::MIN)), + manual: Default::default(), + })), + ); + for i in 0..default_ft.limit.get() { + let new_id = InnerFlowId { + proto: Protocol::UDP.into(), + addrs: AddrPair::V4 { + src: "192.168.2.10".parse().unwrap(), + dst: "76.76.21.21".parse().unwrap(), + }, + proto_info: PortInfo { src_port: i as u16, dst_port: 443 } + .into(), + }; + default_ft.add(new_id, ()).unwrap(); + evict_ft.add(new_id, ()).unwrap(); + } + + // With the default policy and a table full of UDP entries, we can't make + // room for anything new. + assert!(default_ft.add(flowid, ()).is_err()); + assert_eq!(default_ft.num_flows(), FT_SIZE.unwrap().get()); + + // On a table where every flow is evictable, we can! + assert!(evict_ft.add(flowid, ()).is_ok()); + assert_eq!(evict_ft.num_flows(), FT_SIZE.unwrap().get()); + + // If we soft-kill a flow entry (i.e., one of its ancestors was evicted) + // then we can make room to insert a new one. + default_ft.map.values().next().unwrap().mark_evicted(); + assert_eq!(default_ft.num_flows(), FT_SIZE.unwrap().get()); + assert!(default_ft.add(flowid, ()).is_ok()); + assert_eq!(default_ft.num_flows(), FT_SIZE.unwrap().get()); + } + + #[test] + fn eviction_selects_highest_prio() { + let flowid = InnerFlowId { + proto: Protocol::TCP.into(), + addrs: AddrPair::V4 { + src: "192.168.2.10".parse().unwrap(), + dst: "76.76.21.21".parse().unwrap(), + }, + proto_info: PortInfo { src_port: 37890, dst_port: 443 }.into(), + }; + + let sacrificial_flow = InnerFlowId { + proto: Protocol::UDP.into(), + addrs: AddrPair::V4 { + src: "192.168.2.10".parse().unwrap(), + dst: "76.76.21.21".parse().unwrap(), + }, + proto_info: PortInfo { src_port: 5, dst_port: 443 }.into(), + }; + + let mut evict_ft = FlowTable::new( + "port", + "prio-table", + FT_SIZE.unwrap(), + Some(Arc::new(FixedPolicy { + time: Duration::from_secs(FLOW_DEF_EXPIRE_SECS), + default: Some(EvictionPriority::Evictable(NonZeroU16::MIN)), + manual: vec![( + sacrificial_flow, + EvictionPriority::Evictable(16.try_into().unwrap()), + )] + .into_iter() + .collect(), + })), + ); + for i in 0..evict_ft.limit.get() { + let new_id = InnerFlowId { + proto: Protocol::UDP.into(), + addrs: AddrPair::V4 { + src: "192.168.2.10".parse().unwrap(), + dst: "76.76.21.21".parse().unwrap(), + }, + proto_info: PortInfo { src_port: i as u16, dst_port: 443 } + .into(), + }; + evict_ft.add(new_id, ()).unwrap(); + } + + // We've set this table up so that one of these flows will have a + // far higher eviction priority than its neighbours. This could + // be driven by protocol, by time, or some other aspect of flow state. + // + // This is the entry we will evict, regardless of the age of all others. + assert!(evict_ft.map.contains_key(&sacrificial_flow)); + assert!(evict_ft.add(flowid, ()).is_ok()); + assert_eq!(evict_ft.num_flows(), FT_SIZE.unwrap().get()); + assert!(!evict_ft.map.contains_key(&sacrificial_flow)); + } + + #[test] + fn eviction_invalidates_descendents() { + let flowid = InnerFlowId { + proto: Protocol::TCP.into(), + addrs: AddrPair::V4 { + src: "192.168.2.10".parse().unwrap(), + dst: "76.76.21.21".parse().unwrap(), + }, + proto_info: PortInfo { src_port: 37890, dst_port: 443 }.into(), + }; + + let mut ft1 = + FlowTable::new("port", "parent-table", FT_SIZE.unwrap(), None); + let mut ft2 = + FlowTable::new("port", "child-table", FT_SIZE.unwrap(), None); + let mut ft2_2 = + FlowTable::new("port", "other-child-table", FT_SIZE.unwrap(), None); + let mut ft3 = + FlowTable::new("port", "grandchild-table", FT_SIZE.unwrap(), None); + let fe1 = ft1.add(flowid, ()).unwrap(); + let fe2 = ft2.add(flowid, ()).unwrap(); + let fe_out_of_chain = ft2_2.add(flowid, ()).unwrap(); + let fe3 = ft3.add(flowid, ()).unwrap(); + + fe1.push_child(&(fe2.clone() as Arc)); + fe2.push_child(&(fe3.clone() as Arc)); + fe_out_of_chain.push_child(&(fe3.clone() as Arc)); + + // If we invalidate fe1, then all of its *direct descendants* will also + // be invalid. + fe1.mark_evicted(); + assert!(fe1.is_killed()); + assert!(fe2.is_killed()); + assert!(fe3.is_killed()); + assert!(!fe_out_of_chain.is_killed()); + } + + #[test] + fn eviction_priority_inheritance() { + let flowid = InnerFlowId { + proto: Protocol::UDP.into(), + addrs: AddrPair::V4 { + src: "192.168.2.10".parse().unwrap(), + dst: "76.76.21.21".parse().unwrap(), + }, + proto_info: PortInfo { src_port: 37890, dst_port: 443 }.into(), + }; + + let mut ft1 = + FlowTable::new("port", "parent-table", FT_SIZE.unwrap(), None); + let mut ft2 = FlowTable::new( + "port", + "child-table", + FT_SIZE.unwrap(), + Some(Arc::new(FixedPolicy { + time: Duration::from_secs(FLOW_DEF_EXPIRE_SECS), + default: Some(EvictionPriority::Evictable(NonZeroU16::MAX)), + manual: Default::default(), + })), + ); + let mut ft2_2 = FlowTable::new( + "port", + "other-child-table", + FT_SIZE.unwrap(), + Some(Arc::new(FixedPolicy { + time: Duration::from_secs(FLOW_DEF_EXPIRE_SECS), + default: Some(EvictionPriority::Evictable(NonZeroU16::MIN)), + manual: Default::default(), + })), + ); + let mut ft2_3 = FlowTable::new( + "port", + "other-other-child-table", + FT_SIZE.unwrap(), + Some(Arc::new(FixedPolicy { + time: Duration::from_secs(FLOW_DEF_EXPIRE_SECS), + default: Some(EvictionPriority::Protected), + manual: Default::default(), + })), + ); + + let fe1 = ft1.add(flowid, ()).unwrap(); + let fe2 = ft2.add(flowid, ()).unwrap(); + let fe2_2 = ft2_2.add(flowid, ()).unwrap(); + let fe2_3 = ft2_3.add(flowid, ()).unwrap(); + + // By default, we have no entry expressing a preference. + let now = fe2_3.last_hit(); + assert_eq!(fe1.eviction_priority(now), None); + + // When an explicit priority is requested by any descendent, we choose the + // strongest requirement that the flow remain in place. Each flow we push + // here makes the flow less likely for eviction. + fe1.push_child(&(fe2.clone() as Arc)); + assert_eq!( + fe1.eviction_priority(now), + Some(EvictionPriority::Evictable(NonZeroU16::MAX)), + ); + + fe1.push_child(&(fe2_2.clone() as Arc)); + assert_eq!( + fe1.eviction_priority(now), + Some(EvictionPriority::Evictable(NonZeroU16::MIN)), + ); + + fe1.push_child(&(fe2_3.clone() as Arc)); + assert_eq!( + fe1.eviction_priority(now), + Some(EvictionPriority::Protected), + ); + + // Removal of any entry (i.e., because that entry expired) should result + // in a corresponding change to the flow priority. + fe1.remove_child(&(fe2_3 as Arc)); + fe1.remove_child(&(fe2_2 as Arc)); + assert_eq!( + fe1.eviction_priority(now), + Some(EvictionPriority::Evictable(NonZeroU16::MAX)), + ); + } } diff --git a/lib/opte/src/engine/layer.rs b/lib/opte/src/engine/layer.rs index 643da97e..2a49f1b1 100644 --- a/lib/opte/src/engine/layer.rs +++ b/lib/opte/src/engine/layer.rs @@ -39,6 +39,7 @@ use crate::ddi::kstat::KStatProvider; use crate::ddi::kstat::KStatU64; use crate::ddi::mblk::MsgBlk; use crate::ddi::time::Moment; +use crate::engine::flow_table::FlowState; use alloc::ffi::CString; use alloc::string::String; use alloc::string::ToString; @@ -49,6 +50,8 @@ use core::fmt; use core::fmt::Display; use core::num::NonZeroU32; use core::result; +use core::sync::atomic::AtomicU64; +use core::sync::atomic::Ordering; use illumos_sys_hdrs::c_char; use illumos_sys_hdrs::uintptr_t; use opte_api::ActionDescEntryDump; @@ -182,6 +185,8 @@ impl Dump for LftOutEntry { } } +impl FlowState for LftOutEntry {} + struct LayerFlowTable { limit: NonZeroU32, count: u32, @@ -196,18 +201,25 @@ pub struct LftDump { } impl LayerFlowTable { + #[must_use] fn add_pair( &mut self, action_desc: ActionDescEntry, in_flow: InnerFlowId, out_flow: InnerFlowId, - ) { - // We add unchekced because the limit is now enforced by - // LayerFlowTable, not the individual flow tables. - self.ft_in.add_unchecked(in_flow, action_desc.clone()); + ) -> (Arc>, Arc>) { + // We add unchecked because the limit is now enforced by + // LayerFlowTable, not the individual flow tables. Because these flow + // entries must be retired in pairs (and we want, e.g., inbound flow + // hits to sustain the outbound flow and vice-versa) we create the out + // entry as a _partner_ of the inbound one. + let in_lft = self.ft_in.add_unchecked(in_flow, action_desc.clone()); let out_entry = LftOutEntry { in_flow_pair: in_flow, action_desc }; - self.ft_out.add_unchecked(out_flow, out_entry); + let out_lft = + self.ft_out.add_unchecked_partner(out_flow, out_entry, &in_lft); self.count += 1; + + (in_lft, out_lft) } /// Clear all flow table entries. @@ -222,18 +234,8 @@ impl LayerFlowTable { } fn expire_flows(&mut self, now: Moment) { - // XXX The two sides can have different traffic patterns and - // thus one side could be considered expired while the other - // is active. You could have one side seeing packets while the - // other side is idle; so what do we do? Currently this impl - // bases expiration on the outgoing side only, but expires - // both entries (just like it's imperative to add an entry as - // a pair, it's also imperative to remove an entry as a pair). - // Perhaps the two sides should share a single moment (though - // that would required mutex or atomic). Or perhaps both sides - // should be checked, and if either side is expired the pair - // is considered expired (or active). Maybe this should be - // configurable? + // Flow table in/out entries share a lifetime struct, so it's irrelevant + // which of these tables we check first. let to_expire = self.ft_out.expire_flows(now, LftOutEntry::extract_pair); for flow in to_expire { @@ -242,14 +244,14 @@ impl LayerFlowTable { self.count = self.ft_out.num_flows(); } - fn get_in(&self, flow: &InnerFlowId) -> EntryState { + fn get_in(&self, flow: &InnerFlowId) -> EntryState<'_, ActionDescEntry> { match self.ft_in.get(flow) { Some(entry) => { entry.hit(); if entry.is_dirty() { - EntryState::Dirty(entry.state().clone()) + EntryState::Dirty(entry) } else { - EntryState::Clean(entry.state().clone()) + EntryState::Clean(entry) } } @@ -257,15 +259,14 @@ impl LayerFlowTable { } } - fn get_out(&self, flow: &InnerFlowId) -> EntryState { + fn get_out(&self, flow: &InnerFlowId) -> EntryState<'_, LftOutEntry> { match self.ft_out.get(flow) { Some(entry) => { entry.hit(); - let action = entry.state().action_desc.clone(); if entry.is_dirty() { - EntryState::Dirty(action) + EntryState::Dirty(entry) } else { - EntryState::Clean(action) + EntryState::Clean(entry) } } @@ -331,14 +332,14 @@ impl LayerFlowTable { } /// The result of a flowtable lookup. -pub enum EntryState { +pub enum EntryState<'a, S: FlowState> { /// No flow entry was found matching a given flowid. None, /// An existing flow table entry was found. - Clean(ActionDescEntry), + Clean(&'a Arc>), /// An existing flow table entry was found, but rule processing must be rerun /// to use the original action or invalidate the underlying entry. - Dirty(ActionDescEntry), + Dirty(&'a Arc>), } /// The default action of a layer. @@ -398,6 +399,8 @@ impl Display for ActionDescEntry { } } +impl FlowState for ActionDescEntry {} + /// The actions of a layer. /// /// This describes the actions a layer's rules can take as well as the @@ -491,6 +494,11 @@ struct LayerStats { set_rules_called: KStatU64, } +enum SpaceCreated { + AmpleSpace, + Evict { in_key: InnerFlowId, out_key: InnerFlowId }, +} + pub struct Layer { port_c: CString, name: &'static str, @@ -782,6 +790,43 @@ impl Layer { } } + /// Determine whether there is currently space for a new entry to be + /// inserted. + /// + /// If out of space, this method will attempt to evict an existing entry. + fn check_for_space( + &self, + dir: Direction, + ) -> result::Result { + if self.ft.count < self.ft.limit.get() { + return Ok(SpaceCreated::AmpleSpace); + } + + // Both in/out share the same `killed` flag, children, and evictability, + // so we only need to check the outbound table. + if let Some((out_key, out_entry)) = + self.ft.ft_out.find_evictable_entry() + { + let in_key = out_entry.state().in_flow_pair; + Ok(SpaceCreated::Evict { out_key, in_key }) + } else { + let stat = match dir { + Direction::In => &self.stats.vals.in_lft_full, + Direction::Out => &self.stats.vals.out_lft_full, + }; + stat.incr(1); + Err(LayerError::FlowTableFull { layer: self.name, dir }) + } + } + + fn complete_eviction(&mut self, entry: SpaceCreated) { + if let SpaceCreated::Evict { in_key, out_key } = entry { + self.ft.ft_out.expire(&out_key); + self.ft.ft_in.expire(&in_key); + self.ft.count = self.ft.ft_out.num_flows(); + } + } + pub(crate) fn process( &mut self, ectx: &ExecCtx, @@ -816,19 +861,26 @@ impl Layer { // Do we have a FlowTable entry? If so, use it. let flow = *pkt.flow(); let action = match self.ft.get_in(&flow) { - EntryState::Dirty(ActionDescEntry::Desc(action)) - if action.is_valid() => - { - self.ft.mark_clean(Direction::In, &flow); - Some(ActionDescEntry::Desc(action)) + EntryState::Dirty(action) => { + if let ActionDescEntry::Desc(desc) = action.state() + && desc.is_valid() + { + let desc = Arc::clone(desc); + pkt.record_lft(Arc::clone(action) as _); + self.ft.mark_clean(Direction::In, &flow); + Some(ActionDescEntry::Desc(desc)) + } else { + // NoOps are included in this case as we can't ask the actor + // whether it remains valid: the simplest method to do so is + // to rerun lookup. + self.ft.remove_in(&flow); + None + } } - EntryState::Dirty(_) => { - // NoOps are included in this case as we can't ask the actor whether - // it remains valid: the simplest method to do so is to rerun lookup. - self.ft.remove_in(&flow); - None + EntryState::Clean(action) => { + pkt.record_lft(Arc::clone(action) as _); + Some(action.state().clone()) } - EntryState::Clean(action) => Some(action), EntryState::None => None, }; @@ -895,13 +947,8 @@ impl Layer { Action::Allow => Ok(LayerResult::Allow), Action::StatefulAllow => { - if self.ft.count == self.ft.limit.get() { - self.stats.vals.in_lft_full += 1; - return Err(LayerError::FlowTableFull { - layer: self.name, - dir: In, - }); - } + let write_to = self.check_for_space(In)?; + self.complete_eviction(write_to); // The outbound flow ID mirrors the inbound. Remember, // the "top" of layer represents how the client sees @@ -909,7 +956,8 @@ impl Layer { // represents how the network sees the traffic. let flow_out = pkt.flow().mirror(); let desc = ActionDescEntry::NoOp; - self.ft.add_pair(desc, *pkt.flow(), flow_out); + let (in_lft, _) = self.ft.add_pair(desc, *pkt.flow(), flow_out); + pkt.record_lft(in_lft); self.stats.vals.flows += 1; Ok(LayerResult::Allow) } @@ -1005,13 +1053,7 @@ impl Layer { // In general, the semantic of a StatefulAction is // that it gets an FT entry. If there are no slots // available, then we must fail until one opens up. - if self.ft.count == self.ft.limit.get() { - self.stats.vals.in_lft_full += 1; - return Err(LayerError::FlowTableFull { - layer: self.name, - dir: In, - }); - } + let write_to = self.check_for_space(In)?; let desc = match action.gen_desc(pkt.flow(), pkt, ameta) { Ok(aord) => match aord { @@ -1031,6 +1073,8 @@ impl Layer { } }; + self.complete_eviction(write_to); + let flow_before = *pkt.flow(); let ht_in = desc.gen_ht(In, ameta); pkt.hdr_transform(&ht_in)?; @@ -1058,11 +1102,12 @@ impl Layer { // The final step is to mirror the IPs and ports to // reflect the traffic direction change. let flow_out = pkt.flow().mirror(); - self.ft.add_pair( + let (in_lft, _) = self.ft.add_pair( ActionDescEntry::Desc(desc), flow_before, flow_out, ); + pkt.record_lft(in_lft); self.stats.vals.flows += 1; Ok(LayerResult::Allow) } @@ -1102,19 +1147,26 @@ impl Layer { // Do we have a FlowTable entry? If so, use it. let flow = *pkt.flow(); let action = match self.ft.get_out(&flow) { - EntryState::Dirty(ActionDescEntry::Desc(action)) - if action.is_valid() => - { - self.ft.mark_clean(Direction::Out, &flow); - Some(ActionDescEntry::Desc(action)) + EntryState::Dirty(action) => { + if let ActionDescEntry::Desc(desc) = &action.state().action_desc + && desc.is_valid() + { + let desc = Arc::clone(desc); + pkt.record_lft(Arc::clone(action) as _); + self.ft.mark_clean(Direction::Out, &flow); + Some(ActionDescEntry::Desc(desc)) + } else { + // NoOps are included in this case as we can't ask the actor + // whether it remains valid: the simplest method to do so is + // to rerun lookup. + self.ft.remove_out(&flow); + None + } } - EntryState::Dirty(_) => { - // NoOps are included in this case as we can't ask the actor whether - // it remains valid: the simplest method to do so is to rerun lookup. - self.ft.remove_out(&flow); - None + EntryState::Clean(action) => { + pkt.record_lft(Arc::clone(action) as _); + Some(action.state().action_desc.clone()) } - EntryState::Clean(action) => Some(action), EntryState::None => None, }; @@ -1181,13 +1233,8 @@ impl Layer { Action::Allow => Ok(LayerResult::Allow), Action::StatefulAllow => { - if self.ft.count == self.ft.limit.get() { - self.stats.vals.out_lft_full += 1; - return Err(LayerError::FlowTableFull { - layer: self.name, - dir: Out, - }); - } + let write_to = self.check_for_space(Out)?; + self.complete_eviction(write_to); // The inbound flow ID must be calculated _after_ the // header transformation. Remember, the "top" @@ -1197,7 +1244,12 @@ impl Layer { // The final step is to mirror the IPs and ports to // reflect the traffic direction change. let flow_in = pkt.flow().mirror(); - self.ft.add_pair(ActionDescEntry::NoOp, flow_in, *pkt.flow()); + let (_, out_lft) = self.ft.add_pair( + ActionDescEntry::NoOp, + flow_in, + *pkt.flow(), + ); + pkt.record_lft(out_lft); self.stats.vals.flows += 1; Ok(LayerResult::Allow) } @@ -1293,13 +1345,7 @@ impl Layer { // In general, the semantic of a StatefulAction is // that it gets an FT entry. If there are no slots // available, then we must fail until one opens up. - if self.ft.count == self.ft.limit.get() { - self.stats.vals.out_lft_full += 1; - return Err(LayerError::FlowTableFull { - layer: self.name, - dir: Out, - }); - } + let write_to = self.check_for_space(Out)?; let desc = match action.gen_desc(pkt.flow(), pkt, ameta) { Ok(aord) => match aord { @@ -1319,6 +1365,8 @@ impl Layer { } }; + self.complete_eviction(write_to); + let flow_before = *pkt.flow(); let ht_out = desc.gen_ht(Out, ameta); pkt.hdr_transform(&ht_out)?; @@ -1347,11 +1395,12 @@ impl Layer { // to mirror the IPs and ports to reflect the traffic // direction change. let flow_in = pkt.flow().mirror(); - self.ft.add_pair( + let (_, out_lft) = self.ft.add_pair( ActionDescEntry::Desc(desc), flow_in, flow_before, ); + pkt.record_lft(out_lft); self.stats.vals.flows += 1; Ok(LayerResult::Allow) } @@ -1521,13 +1570,17 @@ impl Layer { #[derive(Debug)] struct RuleTableEntry { id: RuleId, - hits: u64, + hits: AtomicU64, rule: Rule, } impl From<&RuleTableEntry> for RuleTableEntryDump { fn from(rte: &RuleTableEntry) -> Self { - Self { id: rte.id, hits: rte.hits, rule: RuleDump::from(&rte.rule) } + Self { + id: rte.id, + hits: rte.hits.load(Ordering::Relaxed), + rule: RuleDump::from(&rte.rule), + } } } @@ -1555,12 +1608,14 @@ impl RuleTable { fn add(&mut self, rule: Rule) { match self.find_pos(&rule) { RulePlace::End => { - let rte = RuleTableEntry { id: self.next_id, hits: 0, rule }; + let rte = + RuleTableEntry { id: self.next_id, hits: 0.into(), rule }; self.rules.push(rte); } RulePlace::Insert(idx) => { - let rte = RuleTableEntry { id: self.next_id, hits: 0, rule }; + let rte = + RuleTableEntry { id: self.next_id, hits: 0.into(), rule }; self.rules.insert(idx, rte); } } @@ -1576,14 +1631,14 @@ impl RuleTable { } fn find_match( - &mut self, + &self, ifid: &InnerFlowId, pmeta: &MblkPacketData, ameta: &ActionMeta, ) -> Option<&Rule> { - for rte in self.rules.iter_mut() { + for rte in self.rules.iter() { if rte.rule.is_match(pmeta, ameta) { - rte.hits += 1; + rte.hits.fetch_add(1, Ordering::Relaxed); Self::rule_match_probe( self.port_c.as_c_str(), self.layer_c.as_c_str(), diff --git a/lib/opte/src/engine/packet.rs b/lib/opte/src/engine/packet.rs index 25ab6af3..d4a3d91d 100644 --- a/lib/opte/src/engine/packet.rs +++ b/lib/opte/src/engine/packet.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2025 Oxide Computer Company +// Copyright 2026 Oxide Computer Company //! Types for creating, reading, and writing network packets. @@ -45,6 +45,7 @@ use crate::d_error::DError; use crate::ddi::mblk::MsgBlk; use crate::ddi::mblk::MsgBlkIterMut; use crate::ddi::mblk::MsgBlkNode; +use crate::engine::flow_table::FlowEntryInfo; use crate::engine::geneve::GeneveMeta; use alloc::boxed::Box; use alloc::string::String; @@ -790,6 +791,7 @@ where body_modified: false, len, inner_csum_dirty: false, + lfts: vec![], }, } } @@ -1387,6 +1389,14 @@ impl Packet> { l3.compute_checksum(); } } + + pub fn record_lft(&mut self, lft: Arc) { + self.state.lfts.push(lft); + } + + pub fn take_lfts(&mut self) -> Vec> { + self.state.lfts.drain(..).collect() + } } impl> PacketState @@ -1435,6 +1445,9 @@ pub struct FullParsed { /// Tracks whether any transform has been applied to this packet /// which would dirty the inner L3 and/or ULP header checksums. inner_csum_dirty: bool, + /// The set of all LFTs created or used by this packet as it + /// traverses the slow path. + lfts: Vec>, } /// Minimum-size zerocopy view onto a parsed packet, sufficient for fast diff --git a/lib/opte/src/engine/port/mod.rs b/lib/opte/src/engine/port/mod.rs index 34b231d1..f19e44b2 100644 --- a/lib/opte/src/engine/port/mod.rs +++ b/lib/opte/src/engine/port/mod.rs @@ -66,7 +66,11 @@ use crate::ddi::mblk::MsgBlkIterMut; use crate::ddi::sync::KMutex; use crate::ddi::sync::KRwLock; use crate::ddi::time::Moment; +use crate::engine::flow_table::EvictionPriority; use crate::engine::flow_table::ExpiryPolicy; +use crate::engine::flow_table::FLOW_DEF_TTL; +use crate::engine::flow_table::FlowEntryInfo; +use crate::engine::flow_table::FlowState; use crate::engine::headers::Valid; use crate::engine::packet::EmitSpec; use crate::engine::packet::PushSpec; @@ -76,10 +80,12 @@ use alloc::ffi::CString; use alloc::string::String; use alloc::string::ToString; use alloc::sync::Arc; +use alloc::sync::Weak; use alloc::vec::Vec; use core::ffi::CStr; use core::fmt; use core::fmt::Display; +use core::num::NonZeroU16; use core::num::NonZeroU32; use core::result; use core::str::FromStr; @@ -335,7 +341,7 @@ impl PortBuilder { &self.name, "tcp_flows", tcp_limit, - Some(Box::::default()), + Some(Arc::::default()), ), }; @@ -521,8 +527,19 @@ pub enum DumpLayerError { LayerNotFound, } +/// Operations that the main flow identifier type for a given [`NetworkImpl`] +/// must support. +// This should live in `opte::api`, but we don't want to introduce an +// API version change until this is something that *can* actually be specified +// on a per-port basis. +pub trait FlowId: + fmt::Debug + Send + Sync + Copy + Eq + Ord + core::hash::Hash +{ +} +impl FlowId for InnerFlowId {} + /// An entry in the Unified Flow Table. -pub struct UftEntry { +pub struct UftEntry { /// The flow ID for the other side. pair: KMutex>, @@ -537,11 +554,14 @@ pub struct UftEntry { epoch: u64, /// Cached reference to a flow's TCP state, if applicable. - /// This allows us to maintain up-to-date TCP flow table info - tcp_flow: Option>>, + /// This allows us to maintain up-to-date TCP flow table info without + /// performing a second lookup on the flowhash. + tcp_flow: Option>>, + + parents: Vec>, } -impl Dump for UftEntry { +impl Dump for UftEntry { type DumpVal = UftEntryDump; fn dump(&self, hits: u64) -> Self::DumpVal { @@ -549,7 +569,7 @@ impl Dump for UftEntry { } } -impl Display for UftEntry { +impl Display for UftEntry { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let hdr = self .xforms @@ -569,9 +589,10 @@ impl Display for UftEntry { } } -impl fmt::Debug for UftEntry { +impl fmt::Debug for UftEntry { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let UftEntry { pair: _pair, xforms, l4_hash, epoch, tcp_flow } = self; + let UftEntry { pair: _pair, xforms, l4_hash, epoch, tcp_flow, parents } = + self; f.debug_struct("UftEntry") .field("pair", &"") @@ -579,10 +600,17 @@ impl fmt::Debug for UftEntry { .field("l4_hash", l4_hash) .field("epoch", epoch) .field("tcp_flow", tcp_flow) + .field("parents", parents) .finish() } } +impl FlowState for UftEntry { + fn parents(&self) -> impl Iterator> { + self.parents.iter().map(Arc::clone) + } +} + /// Cumulative counters for a single [`Port`]. #[derive(KStatProvider)] struct PortStats { @@ -1077,18 +1105,24 @@ impl Port { let now = now.unwrap_or_else(Moment::now); check_state!(data.state, [PortState::Running])?; + // Run expiry in reverse order of dependencies here. + // + // The presence of a higher-level entry prevents us from using the timer + // to expire any entry which it depends upon, and we propagate the + // last_hit time of each entry down to its children during expiry. + // + // A TCP state entry or UFT may in turn reference any number of LFT + // hits, so we visit those first to maximise the likelihood that we can + // clear up as many entries as possible. + let _ = data.tcp_flows.expire_flows(now, |_| FLOW_ID_DEFAULT); + + let _ = data.uft_in.expire_flows(now, |_| FLOW_ID_DEFAULT); + let _ = data.uft_out.expire_flows(now, |_| FLOW_ID_DEFAULT); + for l in &mut data.layers { l.expire_flows(now); } - let _ = data.uft_in.expire_flows(now, |_| FLOW_ID_DEFAULT); - let _ = data.uft_out.expire_flows(now, |_| FLOW_ID_DEFAULT); - // XXX: TCP state expiry currently runs on a longer time scale than - // UFT entries, so we don't need to expire any extra UFT entries - // using the output Vec. If this changes, i.e., we - // set TIME_WAIT_EXPIRE_TTL or another state-specific timer lower - // than 60s, we'll need to specifically expire the matching UFTs. - let _ = data.tcp_flows.expire_flows(now, |_| FLOW_ID_DEFAULT); Ok(()) } @@ -1366,70 +1400,84 @@ impl Port { self.uft_hit_probe(dir, &flow_before, epoch, &process_start); let tcp = entry.state().tcp_flow.as_ref(); - if let Some(tcp_flow) = tcp { - tcp_flow.hit_at(process_start); - - let tcp = pkt - .meta() - .inner_tcp() - .expect("failed to find TCP state on known TCP flow"); - - let ufid_in = match dir { - Direction::In => Some(&flow_before), - Direction::Out => None, - }; - - let invalidated_tcp = match tcp_flow.state().update( - self.name_cstr.as_c_str(), - tcp, - dir, - pkt.len() as u64, - ufid_in, - ) { - Ok(TcpState::Closed) => Some(Arc::clone(tcp_flow)), - Err(TcpFlowStateError::NewFlow { .. }) => { - let out = Some(Arc::clone(tcp_flow)); - decision = FastPathDecision::Slow; - out - } - _ => None, - }; - - // Reacquire the writer to remove the flow if needed. - // Elevate lock to full scope, if we are reprocessing - // as well. - if let Some(entry) = invalidated_tcp { - let mut local_lock = self.data.write(); - - let flow_lock = entry.state().inner.lock(); - let ufid_out = &flow_lock.outbound_ufid; - - let ufid_in = flow_lock.inbound_ufid.as_ref(); - - // Because we've dropped the port lock, another packet could have - // also invalidated this flow and removed the entry. It could even - // install new UFT/TCP entries, depending on lock/process ordering. - // - // Verify that the state we want to remove still exists, and is - // `Arc`-identical. - if let Some(found_entry) = - local_lock.tcp_flows.get(ufid_out) - && Arc::ptr_eq(found_entry, &entry) - { - self.uft_tcp_closed( - &mut local_lock, - ufid_out, - ufid_in, - ); - _ = local_lock.tcp_flows.remove(ufid_out); - } + match tcp.map(|v| v.upgrade()) { + // This is a TCP flow, and the flow entry is still active. + Some(Some(tcp_flow)) => { + tcp_flow.hit_at(process_start); + + let tcp = pkt.meta().inner_tcp().expect( + "failed to find TCP state on known TCP flow", + ); + + let ufid_in = match dir { + Direction::In => Some(&flow_before), + Direction::Out => None, + }; + + let invalidated_tcp = match tcp_flow.state().update( + self.name_cstr.as_c_str(), + tcp, + dir, + pkt.len() as u64, + ufid_in, + ) { + Ok(TcpState::Closed) => Some(tcp_flow), + Err(TcpFlowStateError::NewFlow { .. }) => { + let out = Some(tcp_flow); + decision = FastPathDecision::Slow; + out + } + _ => None, + }; + + // Reacquire the writer to remove the flow if needed. + // Elevate lock to full scope, if we are reprocessing + // as well. + if let Some(entry) = invalidated_tcp { + let mut local_lock = self.data.write(); + + let flow_lock = entry.state().inner.lock(); + let ufid_out = &flow_lock.outbound_ufid; + + let ufid_in = flow_lock.inbound_ufid.as_ref(); + + // Because we've dropped the port lock, another + // packet could have also invalidated this flow and + // removed the entry. It could even install new + // UFT/TCP entries, depending on lock/process + // ordering. + // + // Verify that the state we want to remove still + // exists, and is `Arc`-identical. + if let Some(found_entry) = + local_lock.tcp_flows.get(ufid_out) + && Arc::ptr_eq(found_entry, &entry) + { + self.uft_tcp_closed( + &mut local_lock, + ufid_out, + ufid_in, + ); + _ = local_lock.tcp_flows.remove(ufid_out); + } - // We've determined we're actually starting a new TCP flow (e.g., - // SYN on any other state) from an existing UFT entry. - if matches!(decision, FastPathDecision::Slow) { - lock = Some(local_lock); + // We've determined we're actually starting a new + // TCP flow (e.g., SYN on any other state) from an + // existing UFT entry. + if matches!(decision, FastPathDecision::Slow) { + lock = Some(local_lock); + } } } + // The TCP flow this UFT is associated with is gone. + // Fallback to the slowpath and regenerate it or acquire a + // new entry if one is present. + Some(None) => { + decision = FastPathDecision::Slow; + lock = Some(self.data.write()); + } + // There is no TCP flow. + None => {} } } _ => {} @@ -2150,7 +2198,7 @@ impl Port { TcpFlowEntryState::new_outbound(*ufid_out, tfs, pkt_len), ), }; - match tcp_flows.add_and_return(*ufid_out, tfes) { + match tcp_flows.add(*ufid_out, tfes) { Ok(entry) => Ok(TcpMaybeClosed::NewState(tcp_state, entry)), Err(OpteError::MaxCapacity(limit)) => { Err(ProcessError::FlowTableFull { kind: "TCP", limit }) @@ -2350,6 +2398,7 @@ impl Port { epoch, l4_hash: ufid_in.crc32(), tcp_flow: None, + parents: pkt.take_lfts(), }; // Keep around the comment on the `None` arm @@ -2392,9 +2441,12 @@ impl Port { // Found existing TCP flow, or have just created a new one. Ok(TcpMaybeClosed::NewState(_, flow)) => { // We have a good TCP flow, create a new UFT entry. - hte.tcp_flow = Some(flow); + hte.tcp_flow = Some(Arc::downgrade(&flow)); match data.uft_in.add(*ufid_in, hte) { - Ok(_) => Ok(InternalProcessResult::Modified), + Ok(v) => { + associate_lfts_upstack(&v, Direction::In); + Ok(InternalProcessResult::Modified) + } Err(OpteError::MaxCapacity(limit)) => { Err(ProcessError::FlowTableFull { kind: "UFT", @@ -2431,7 +2483,10 @@ impl Port { } } else { match data.uft_in.add(*ufid_in, hte) { - Ok(_) => Ok(InternalProcessResult::Modified), + Ok(v) => { + associate_lfts_upstack(&v, Direction::In); + Ok(InternalProcessResult::Modified) + } Err(OpteError::MaxCapacity(limit)) => { Err(ProcessError::FlowTableFull { kind: "UFT", limit }) } @@ -2529,7 +2584,9 @@ impl Port { } // Continue with processing. - Ok(TcpMaybeClosed::NewState(_, flow)) => Some(flow), + Ok(TcpMaybeClosed::NewState(_, flow)) => { + Some(Arc::downgrade(&flow)) + } // Unlike for existing flows, we don't allow through // unexpected packets here for now -- the `TcpState` FSM @@ -2581,6 +2638,7 @@ impl Port { epoch, l4_hash: flow_before.crc32(), tcp_flow, + parents: pkt.take_lfts(), }; match res { @@ -2590,7 +2648,10 @@ impl Port { return Ok(InternalProcessResult::Modified); } match data.uft_out.add(flow_before, hte) { - Ok(_) => Ok(InternalProcessResult::Modified), + Ok(v) => { + associate_lfts_upstack(&v, Direction::Out); + Ok(InternalProcessResult::Modified) + } Err(OpteError::MaxCapacity(limit)) => { Err(ProcessError::FlowTableFull { kind: "UFT", limit }) } @@ -2858,6 +2919,9 @@ pub struct TcpFlowEntryStateInner { segs_out: u64, bytes_in: u64, bytes_out: u64, + + inbound_parents: Vec>, + outbound_parents: Vec>, } pub struct TcpFlowEntryState { @@ -2880,6 +2944,9 @@ impl TcpFlowEntryState { segs_out: 0, bytes_in, bytes_out: 0, + + inbound_parents: vec![], + outbound_parents: vec![], }), } } @@ -2898,6 +2965,9 @@ impl TcpFlowEntryState { segs_out: 1, bytes_in: 0, bytes_out, + + inbound_parents: vec![], + outbound_parents: vec![], }), } } @@ -2942,6 +3012,47 @@ impl TcpFlowEntryState { } } +/// Install all parent-child links between a UFT entry, the LFT entries which +/// underpin it, and the TCP flow state entry if present. +fn associate_lfts_upstack( + uft: &Arc>>, + dir: Direction, +) { + // The goal here is to provide each LFT hit with two children where + // possible. These are the UFT and, when it exists, the TCP flow entry. + // What this means in practice is that while either is present, the LFTs + // should be immune to timer-driven expiry. They are *not* immune to + // explicit eviction. + // + // We don't arrange the UFT as a parent of the TCP flow entry because + // this would cause the existence of the flow table entry to hold the UFT + // entry in place even if the flow is mostly idle. This also sets us up for + // being able to have different eviction policies, table sizes, and timers + // on the UFT to keep it a small cache without breaking flows. + let uft_dyn: Arc = Arc::clone(uft) as _; + for lft in &uft.state().parents { + lft.push_child(&uft_dyn); + } + if let Some(tcp) = uft.state().tcp_flow.as_ref().and_then(Weak::upgrade) { + let tcp_dyn: Arc = Arc::clone(&tcp) as _; + let mut parents = uft.state().parents.clone(); + let mut inner = tcp.state().inner.lock(); + let new_parent_slot = match dir { + Direction::In => &mut inner.inbound_parents, + Direction::Out => &mut inner.outbound_parents, + }; + + core::mem::swap(new_parent_slot, &mut parents); + let old_parents = parents; + for old_lft in old_parents { + old_lft.remove_child(&tcp_dyn); + } + for new_lft in new_parent_slot { + new_lft.push_child(&tcp_dyn); + } + } +} + impl core::fmt::Debug for TcpFlowEntryState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let inner = self.inner.lock(); @@ -2990,6 +3101,17 @@ impl Dump for TcpFlowEntryState { } } +impl FlowState for TcpFlowEntryState { + fn parents(&self) -> impl Iterator> { + let inner = self.inner.lock(); + inner + .inbound_parents + .clone() + .into_iter() + .chain(inner.outbound_parents.clone()) + } +} + /// Expiry behaviour for TCP flows dependent on the connection FSM. #[derive(Debug)] pub struct TcpExpiry { @@ -3028,6 +3150,53 @@ impl ExpiryPolicy for TcpExpiry { }; ttl.is_expired(entry.last_hit(), now) } + + fn eviction_priority( + &self, + entry: &FlowEntry, + now: Moment, + ) -> Option { + let delta = now.delta_as_millis(entry.last_hit()); + match entry.state().tcp_state() { + TcpState::TimeWait + | TcpState::CloseWait + | TcpState::FinWait1 + | TcpState::FinWait2 => match delta { + // We can remain in a closing state for quite some time. We need + // to be willing to evict such entries from this cache fairly + // quickly, but leave them in place for the ~120s when we are + // not under pressure. + a if a > self.incipient_ttl.as_milliseconds() => { + Some(EvictionPriority::Evictable(NonZeroU16::MIN)) + } + _ => Some(EvictionPriority::Protected), + }, + TcpState::SynSent + | TcpState::SynRcvd + | TcpState::Listen + | TcpState::LastAck => match delta { + // Ordinarily we will expire flows in these states quickly. If + // we are under table pressure, we can allow them to be + // explicitly selected for removal faster. + a if a > self.incipient_ttl.as_milliseconds() / 2 => { + Some(EvictionPriority::Evictable(NonZeroU16::MIN)) + } + _ => Some(EvictionPriority::Protected), + }, + // Allow established flows to be evicted on the same time scale as + // UFT/LFT expiry, but if we are not under pressure then we aim to + // keep the LFTs in place even while the flow is inactive. + TcpState::Established + if delta >= FLOW_DEF_TTL.as_milliseconds() => + { + Some(EvictionPriority::Evictable(NonZeroU16::MIN)) + } + TcpState::Established => Some(EvictionPriority::Protected), + TcpState::Closed => { + Some(EvictionPriority::Evictable(NonZeroU16::MAX)) + } + } + } } #[cfg(all(not(feature = "std"), not(test)))] diff --git a/lib/oxide-vpc/tests/integration_tests.rs b/lib/oxide-vpc/tests/integration_tests.rs index 7472883c..4f322d1f 100644 --- a/lib/oxide-vpc/tests/integration_tests.rs +++ b/lib/oxide-vpc/tests/integration_tests.rs @@ -3514,22 +3514,31 @@ fn tcp_outbound() { // TCP flow expiry behaviour // ================================================================ // - UFTs for individual flows live on the same cadence as other traffic. + // We expect these to expire, as the TCP flow will not pin their + // lifetime. + // - The presence of the TCP flow entry will keep the firewall entry alive. + // If the UFT were present, it would serve the same purpose. // - TCP state machine info should be cleaned up after an active close. + // // TimeWait state has a ~2min lifetime before we flush it -- it should still - // be present at UFT expiry: + // be present at UFT expiry. let now = Moment::now(); g1.port .expire_flows_at(now + Duration::new(FLOW_DEF_EXPIRE_SECS + 1, 0)) .unwrap(); - zero_flows!(g1); + decr!(g1, ["uft.in, uft.out"]); assert_eq!(TcpState::TimeWait, g1.port.tcp_state(&flow).unwrap()); // The TCP flow state should then be flushed after 2 mins. // Note that this case applies to any active-close initiated by the // guest, irrespective of inbound/outbound. + // + // Once this flow is removed, the LFTs associated with the flow become + // eligible for time-based expiry. g1.port .expire_flows_at(now + Duration::new(TIME_WAIT_EXPIRE_SECS + 1, 0)) .unwrap(); + zero_flows!(g1); assert_eq!(None, g1.port.tcp_state(&flow)); }