From f826b0f1ab853e922e313f21551ae8768001803b Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Fri, 22 May 2026 08:34:18 +0000 Subject: [PATCH 1/4] feat: Add `ColdStats` aggregate to `CanisterStates` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Maintains a small `ColdStats` aggregate over the canisters in the `cold` pool, updated incrementally on every transition into / out of `cold`. This lets the "touch every canister" aggregate queries — `total_compute_allocation`, `total_canister_memory_usage`, `memory_taken`, `callback_count`, `guaranteed_response_message_memory_taken`, `best_effort_message_memory_taken` — run in `O(|hot|)` instead of `O(|all canisters|)`, which is the primary motivation for the hot/cold split on subnets with a long tail of idle canisters. The aggregates are derived (not persisted) and are reconstructed by `CanisterStates::new` on checkpoint load. `debug_assert_invariants` (now also runs an `O(|cold|)` recompute and compares against the live aggregate) ensures every mutating method keeps them in sync, and the `ColdStats` struct stays module-private — callers always reach the totals through the public aggregator methods on `CanisterStates`. `MemoryTaken`'s fields are bumped from private to `pub(crate)` so that `CanisterStates::memory_taken` can construct the struct directly, keeping `MemoryTaken` in its current home in `replicated_state.rs`. `CanisterStates::memory_taken` itself is `pub(crate)` and will be wired up to `ReplicatedState::memory_taken` in the next PR; an `#[allow(dead_code)]` keeps the build warning-free until then. Aggregator behaviour is exercised by two new tests (`memory_aggregators_combine_hot_and_cold`, `callback_count_combines_hot_and_cold`) and the bookkeeping discipline is exercised by an extended set of `*_updates_cold_stats*` tests covering `insert`, `remove`, `try_cool*`, `for_each_mut`, `try_for_each_mut`, and `retain`. Co-authored-by: Cursor --- rs/replicated_state/src/canister_states.rs | 356 ++++++++++++++++-- .../src/canister_states/tests.rs | 206 +++++++++- rs/replicated_state/src/replicated_state.rs | 10 +- 3 files changed, 515 insertions(+), 57 deletions(-) diff --git a/rs/replicated_state/src/canister_states.rs b/rs/replicated_state/src/canister_states.rs index 1113df379c64..e7cba03c7aaa 100644 --- a/rs/replicated_state/src/canister_states.rs +++ b/rs/replicated_state/src/canister_states.rs @@ -14,12 +14,17 @@ //! [`CanisterStates::try_cool`] for single canisters or //! [`CanisterStates::try_cool_all`] for a bulk pass. //! -//! This module currently only provides the partitioning machinery. The -//! `ReplicatedState` integration and the `O(1)` cold-pool aggregates are -//! introduced in follow-up changes; right now `CanisterStates` is not yet -//! wired into `ReplicatedState`. +//! Internally, `CanisterStates` also maintains `ColdStats`, a small set of +//! aggregates over the cold pool that lets several aggregated queries (e.g. +//! [`CanisterStates::total_compute_allocation`], +//! [`CanisterStates::total_canister_memory_usage`], +//! [`CanisterStates::callback_count`]) become `O(|hot|)` instead of +//! `O(|all canisters|)`. These aggregates are an implementation detail: callers +//! always go through the public aggregator methods. use crate::CanisterState; +use crate::replicated_state::MemoryTaken; +use ic_base_types::NumBytes; use ic_types::CanisterId; use ic_validate_eq::ValidateEq; use ic_validate_eq_derive::ValidateEq; @@ -31,6 +36,100 @@ use std::sync::Arc; #[cfg(test)] mod tests; +/// O(1) aggregates over the canisters currently in the `cold` pool. +/// +/// Maintained incrementally: every transition into / out of `cold` adds or +/// subtracts the contributing canister's values. The aggregates here are +/// expected to be a small enough set that the bookkeeping cost on each +/// transition is constant time and the resulting cost savings on the +/// `O(|all canisters|)` aggregate computations in `ReplicatedState` are +/// significant. +/// +/// Crucially, `ColdStats` is *derived* state: it is recomputable from the +/// `cold` map at any time and is **not** persisted in checkpoints. It is +/// reconstructed when canisters are inserted (e.g. on checkpoint load). +/// +/// Private to the module: external callers reach the same totals through +/// [`CanisterStates::total_compute_allocation`], +/// [`CanisterStates::total_canister_memory_usage`] and +/// [`CanisterStates::callback_count`], which transparently combine the cold +/// aggregate with an `O(|hot|)` pass over hot canisters. +#[derive(Clone, PartialEq, Eq, Debug, Default)] +struct ColdStats { + /// Number of canisters in `cold`. Equal to `cold.len()`. + count: usize, + /// Sum of `ComputeAllocation::as_percent()` across cold canisters. + total_compute_allocation_percent: u64, + /// Sum of `memory_allocation().allocated_bytes(memory_usage())`. + raw_memory: NumBytes, + /// Sum of `memory_usage()`. + memory_usage: NumBytes, + /// Sum of `system_state.guaranteed_response_message_memory_usage()`. Always + /// `0` while invariants hold (cold canisters have empty queues), but + /// tracked for symmetry / debug-time invariant checking. + guaranteed_response_message_memory: NumBytes, + /// Sum of `wasm_custom_sections_memory_usage()`. + wasm_custom_sections_memory: NumBytes, + /// Sum of `canister_history_memory_usage()`. + canister_history_memory: NumBytes, + /// Sum of `ccm.unresponded_callback_count()` (which for cold canisters + /// equals the number of guaranteed-response callbacks; best-effort + /// callbacks force the canister into `hot`). + callback_count: usize, +} + +impl ColdStats { + /// Adds the contribution of `canister` to the aggregates. + fn add(&mut self, canister: &CanisterState) { + self.count += 1; + self.total_compute_allocation_percent += canister.compute_allocation().as_percent(); + let memory_usage = canister.memory_usage(); + self.raw_memory += canister.memory_allocation().allocated_bytes(memory_usage); + self.memory_usage += memory_usage; + self.guaranteed_response_message_memory += canister + .system_state + .guaranteed_response_message_memory_usage(); + self.wasm_custom_sections_memory += canister.wasm_custom_sections_memory_usage(); + self.canister_history_memory += canister.canister_history_memory_usage(); + self.callback_count += canister + .system_state + .call_context_manager() + .map_or(0, |ccm| ccm.unresponded_callback_count()); + } + + /// Subtracts the contribution of `canister` from the aggregates. + fn sub(&mut self, canister: &CanisterState) { + self.count -= 1; + self.total_compute_allocation_percent -= canister.compute_allocation().as_percent(); + let memory_usage = canister.memory_usage(); + self.raw_memory -= canister.memory_allocation().allocated_bytes(memory_usage); + self.memory_usage -= memory_usage; + self.guaranteed_response_message_memory -= canister + .system_state + .guaranteed_response_message_memory_usage(); + self.wasm_custom_sections_memory -= canister.wasm_custom_sections_memory_usage(); + self.canister_history_memory -= canister.canister_history_memory_usage(); + self.callback_count -= canister + .system_state + .call_context_manager() + .map_or(0, |ccm| ccm.unresponded_callback_count()); + } + + /// Computes `ColdStats` from scratch over the provided cold canisters. + /// Used to (re-)derive the aggregates, e.g. after loading from checkpoint + /// and as a `debug_assert!` sanity check inside `debug_assert_invariants`. + fn recompute<'a, I>(cold: I) -> Self + where + I: IntoIterator>, + { + let mut stats = ColdStats::default(); + for c in cold { + stats.add(c.as_ref()); + } + stats + } +} + /// Hot/cold-partitioned collection of canister states. /// /// See the module-level docs for the overall design. The two underlying @@ -39,8 +138,8 @@ mod tests; /// a flat `BTreeMap>` would. /// /// `PartialEq` and `ValidateEq` are derived: two `CanisterStates` are equal iff -/// they have the same partition (hot vs. cold). This makes the partition -/// observable through equality assertions in tests. +/// they have the same partition (hot vs. cold) and the same `cold_stats`. This +/// makes the partition observable through equality assertions in tests. /// /// # Invariants /// @@ -48,13 +147,16 @@ mod tests; /// checked in debug builds by `debug_assert_invariants`: /// /// 1. `hot` and `cold` pools are disjoint (no canister ID in both); -/// 2. every canister in the `cold` pool satisfies `CanisterState::is_cold()`. +/// 2. every canister in the `cold` pool satisfies `CanisterState::is_cold()`; +/// 3. `cold_stats` matches a fresh recomputation over the `cold` pool. /// /// Additionally, the **strict** partition invariant — that every canister in /// the `hot` pool does *not* satisfy `is_cold()` — holds after -/// [`Self::try_cool_all`] and is verified by [`Self::validate_strict_split`]. -/// Between repartitioning passes the `hot` pool may contain canisters that -/// have gone quiet but have not yet been demoted; this is by design. +/// [`Self::try_cool_all`] / +/// [`crate::ReplicatedState::repartition_canister_states`], and is verified +/// during checkpoint validation by [`Self::validate_strict_split`]. Between +/// repartitioning passes the `hot` pool may contain canisters that have gone +/// quiet but have not yet been demoted; this is by design. #[derive(Clone, Debug, Default, PartialEq, ValidateEq)] pub struct CanisterStates { /// Canisters that may have round-level work or are recently active. Always @@ -68,6 +170,9 @@ pub struct CanisterStates { /// need to consider them; per-round operations should skip them. #[validate_eq(CompareWithValidateEq)] cold: BTreeMap>, + + /// O(1) aggregates over `cold` canisters. See [`ColdStats`]. + cold_stats: ColdStats, } impl CanisterStates { @@ -79,14 +184,20 @@ impl CanisterStates { pub fn new(canisters: BTreeMap>) -> Self { let mut hot = BTreeMap::new(); let mut cold = BTreeMap::new(); + let mut cold_stats = ColdStats::default(); for (id, canister) in canisters { if canister.is_cold() { + cold_stats.add(canister.as_ref()); cold.insert(id, canister); } else { hot.insert(id, canister); } } - let states = Self { hot, cold }; + let states = Self { + hot, + cold, + cold_stats, + }; states.debug_assert_invariants(); states } @@ -98,7 +209,8 @@ impl CanisterStates { } /// Returns a mutable reference to the `Arc` in `hot`. If the - /// canister is currently in `cold`, it is first promoted (moved to `hot`). + /// canister is currently in `cold`, it is first promoted (moved to `hot`, with + /// `cold_stats` updated accordingly). /// /// This is the back door used by all mutating accessors on `ReplicatedState`. /// Anyone holding the returned `&mut Arc` may freely mutate the @@ -115,7 +227,8 @@ impl CanisterStates { Entry::Vacant(entry) => { if let Some(canister) = self.cold.remove(id) { - // Was in the `cold` pool, promote it. + // Was in the `cold` pool, update the stats and promote it. + self.cold_stats.sub(canister.as_ref()); let canister = entry.insert(canister); // Unfortunately, the borrow checker won't let us do this here. // self.debug_assert_invariants(); @@ -144,15 +257,16 @@ impl CanisterStates { } /// Inserts a canister into the appropriate pool. If a canister with this - /// ID was already present (in either pool), it is replaced and returned. + /// ID was already present (in either pool), it is replaced and returned; + /// `cold_stats` is adjusted accordingly. pub fn insert(&mut self, canister: Arc) -> Option> { + // Drop any previous entry first so `cold_stats` doesn't double-count + // when we transition from / to the cold pool. let id = canister.canister_id(); - // Drop any previous entry first so that the partition reflects the - // freshly-inserted canister's `is_cold()` regardless of where the - // old entry lived. let prev = self.remove(&id); if canister.is_cold() { + self.cold_stats.add(canister.as_ref()); self.cold.insert(id, canister); } else { self.hot.insert(id, canister); @@ -162,22 +276,30 @@ impl CanisterStates { } /// Removes and returns the canister with the given ID from whichever pool - /// it is in. + /// it is in. Updates `cold_stats` if the canister was in `cold`. pub fn remove(&mut self, id: &CanisterId) -> Option> { - let removed = self.hot.remove(id).or_else(|| self.cold.remove(id)); + let removed = if let Some(canister) = self.hot.remove(id) { + Some(canister) + } else if let Some(canister) = self.cold.remove(id) { + self.cold_stats.sub(canister.as_ref()); + Some(canister) + } else { + None + }; self.debug_assert_invariants(); removed } /// Re-evaluates `is_cold()` for the given canister and, if true, moves the - /// canister from `hot` to `cold`. No-op if the canister is not present, - /// already in `cold`, or not cold. + /// canister from `hot` to `cold`, updating `cold_stats`. No-op if the canister + /// is not present, already in `cold`, or not cold. /// /// Returns true iff a transition (hot → cold) actually happened. pub fn try_cool(&mut self, id: &CanisterId) -> bool { let cooled = match self.hot.entry(*id) { Entry::Occupied(entry) if entry.get().is_cold() => { let canister = entry.remove(); + self.cold_stats.add(canister.as_ref()); self.cold.insert(*id, canister); true } @@ -210,6 +332,7 @@ impl CanisterStates { .collect(); for id in to_cool { let canister = self.hot.remove(&id).unwrap(); + self.cold_stats.add(canister.as_ref()); self.cold.insert(id, canister); } self.debug_assert_invariants(); @@ -255,15 +378,18 @@ impl CanisterStates { } /// Visits every canister (hot and cold) and runs `f` against it, then - /// re-establishes the hot/cold partition. + /// re-establishes strict hot/cold partitioning. /// /// This is the safe way to perform "touch every canister" loops (storage /// charging, checkpoint write-out, …) that may legitimately mutate cold - /// canisters in ways affecting [`CanisterState::is_cold`]. + /// canisters in ways affecting [`CanisterState::is_cold`] *or* any field + /// aggregated in `ColdStats` (`compute_allocation`, `memory_allocation`, + /// canister history size, callbacks). /// /// Iterates the hot pool followed by the cold pool — i.e. canisters are /// **not** yielded in `CanisterId` order — but every canister is visited - /// exactly once. Cost is `O(|hot| + |cold|)`. + /// exactly once. Cost is `O(|hot| + |cold|)`, with a small constant per + /// cold canister for the `cold_stats` bookkeeping. pub fn for_each_mut(&mut self, mut f: F) where F: FnMut(&CanisterId, &mut Arc), @@ -274,15 +400,22 @@ impl CanisterStates { f(id, canister); } - // Cold pool: iterate in place. The closure may flip the canister to - // non-cold, in which case we promote it to `hot` after the loop. + // Cold pool: iterate in place. Because the closure can mutate fields + // that affect `cold_stats` or `is_cold()`, we conservatively subtract + // each canister's pre-call contribution before running `f`, then + // either re-add (if it stays cold) or queue it for promotion to + // `hot` (if `f` flipped it into a non-cold state). let mut to_promote: Vec = Vec::new(); for (id, canister) in self.cold.iter_mut() { + self.cold_stats.sub(canister.as_ref()); f(id, canister); - if !canister.is_cold() { + if canister.is_cold() { + self.cold_stats.add(canister.as_ref()); + } else { to_promote.push(*id); } } + // Promote all canisters that are now hot. for id in to_promote { let canister = self.cold.remove(&id).unwrap(); self.hot.insert(id, canister); @@ -310,13 +443,16 @@ impl CanisterStates { } // Cold pool: same in-place strategy as `for_each_mut`, but short-circuit - // on `Err`. We always preserve the partition for the canister we were - // visiting when the error occurred, then propagate the error. + // on `Err`. We always restore the partition / stats for the canister we + // were visiting when the error occurred, then propagate the error. let mut to_promote: Vec = Vec::new(); if result.is_ok() { for (id, canister) in self.cold.iter_mut() { + self.cold_stats.sub(canister.as_ref()); let res = f(id, canister); - if !canister.is_cold() { + if canister.is_cold() { + self.cold_stats.add(canister.as_ref()); + } else { to_promote.push(*id); } if let Err(e) = res { @@ -325,6 +461,7 @@ impl CanisterStates { } } } + // Promote all canisters that are now hot. for id in to_promote { let canister = self.cold.remove(&id).unwrap(); self.hot.insert(id, canister); @@ -336,7 +473,8 @@ impl CanisterStates { result } - /// Retains only the canisters for which the predicate returns true. + /// Retains only the canisters for which the predicate returns true, updating + /// `cold_stats` to account for any cold canister that was removed. /// /// Iterates both pools. pub fn retain(&mut self, f: F) @@ -344,10 +482,153 @@ impl CanisterStates { F: Fn(&CanisterId, &Arc) -> bool, { self.hot.retain(|id, c| f(id, c)); - self.cold.retain(|id, c| f(id, c)); + self.cold.retain(|id, c| { + if f(id, c) { + true + } else { + self.cold_stats.sub(c.as_ref()); + false + } + }); self.debug_assert_invariants(); } + /// Returns the total reserved compute allocation (as a sum of percentage + /// points) across every canister in either pool. + /// + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + pub fn total_compute_allocation(&self) -> u64 { + let hot: u64 = self + .hot + .values() + .map(|canister| canister.compute_allocation().as_percent()) + .sum(); + hot + self.cold_stats.total_compute_allocation_percent + } + + /// Returns the total number of callbacks (responded and unresponded) + /// registered across every canister in either pool. + /// + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + pub fn callback_count(&self) -> usize { + let hot: usize = self + .hot + .values() + .map(|canister| { + canister + .system_state + .call_context_manager() + .map_or(0, |ccm| ccm.unresponded_callback_count()) + }) + .sum(); + hot + self.cold_stats.callback_count + } + + /// Returns the total memory usage of all canisters in either pool, including + /// message memory. + /// + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + pub fn total_canister_memory_usage(&self) -> NumBytes { + let hot: NumBytes = self + .hot + .values() + .map(|canister| canister.memory_usage() + canister.message_memory_usage().total()) + .sum(); + // Cold canisters have empty queues by `CanisterState::is_cold`, so they + // contribute no message memory usage. + hot + self.cold_stats.memory_usage + } + + /// Returns the total guaranteed-response message memory (including + /// reservations) across every canister in either pool. Does **not** + /// include subnet queues. + /// + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + pub fn guaranteed_response_message_memory_taken(&self) -> NumBytes { + let hot: NumBytes = self + .hot + .values() + .map(|canister| { + canister + .system_state + .guaranteed_response_message_memory_usage() + }) + .sum(); + hot + self.cold_stats.guaranteed_response_message_memory + } + + /// Returns the total best-effort message memory across every canister in + /// either pool. Does **not** include subnet queues. + /// + /// `O(|hot canisters|)` — cold canisters by definition use no best-effort + /// message memory (see `CanisterState::is_cold`). + pub fn best_effort_message_memory_taken(&self) -> NumBytes { + self.hot + .values() + .map(|canister| canister.system_state.best_effort_message_memory_usage()) + .sum() + } + + /// Computes the per-resource [`MemoryTaken`] aggregate across every + /// canister in either pool. + /// + /// Does **not** include subnet queues, call `ReplicatedState::memory_taken` + /// for the actual subnet-wide memory usage. + /// + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + // The only non-test caller of this method is + // `ReplicatedState::memory_taken`, which is wired up in the follow-up + // integration PR. + #[allow(dead_code)] + pub(crate) fn memory_taken(&self) -> MemoryTaken { + let ( + mut execution, + mut guaranteed_response_messages, + best_effort_messages, + mut wasm_custom_sections, + mut canister_history, + ) = self + .hot + .values() + .map(|canister| { + ( + canister.memory_allocated_bytes(), + canister + .system_state + .guaranteed_response_message_memory_usage(), + canister.system_state.best_effort_message_memory_usage(), + canister.wasm_custom_sections_memory_usage(), + canister.canister_history_memory_usage(), + ) + }) + .reduce(|accum, val| { + ( + accum.0 + val.0, + accum.1 + val.1, + accum.2 + val.2, + accum.3 + val.3, + accum.4 + val.4, + ) + }) + .unwrap_or_default(); + + let cold = &self.cold_stats; + execution += cold.raw_memory; + guaranteed_response_messages += cold.guaranteed_response_message_memory; + // `best_effort_messages` has no cold contribution: cold canisters + // have empty queues. + wasm_custom_sections += cold.wasm_custom_sections_memory; + canister_history += cold.canister_history_memory; + + MemoryTaken { + execution, + guaranteed_response_messages, + best_effort_messages, + wasm_custom_sections, + canister_history, + } + } + /// Validates that the current hot/cold partition matches the canonical "strict" /// split that [`CanisterStates::new`] would produce over the same set of /// canisters: @@ -372,10 +653,10 @@ impl CanisterStates { } /// Debug-only consistency check, called at the end of every mutating operation. - /// Verifies invariants (1)–(2) listed under [`CanisterStates`]. + /// Verifies invariants (1)–(3) listed under [`CanisterStates`]. /// /// Also see [`Self::validate_strict_split`] for the stricter validation applied - /// during checkpoint validation. + /// at checkpointing time. fn debug_assert_invariants(&self) { debug_assert!( self.hot.keys().all(|id| !self.cold.contains_key(id)), @@ -385,10 +666,15 @@ impl CanisterStates { self.cold.values().all(|c| c.is_cold()), "cold pool contains a canister that is not cold", ); + debug_assert_eq!( + ColdStats::recompute(self.cold.values()), + self.cold_stats, + "cold_stats is out of sync with the cold pool" + ); } } -/// Iterator returned by [`CanisterStates::all_iter`]. Merge-yields entries from +/// Iterator returned by [`CanisterStates::iter`]. Merge-yields entries from /// the `hot` and `cold` pools in `CanisterId` order. pub struct Iter<'a> { hot: Peekable>>, diff --git a/rs/replicated_state/src/canister_states/tests.rs b/rs/replicated_state/src/canister_states/tests.rs index 7eb5a5bdb72c..f130d18f6b5e 100644 --- a/rs/replicated_state/src/canister_states/tests.rs +++ b/rs/replicated_state/src/canister_states/tests.rs @@ -5,8 +5,11 @@ use ic_base_types::NumSeconds; use ic_registry_subnet_type::SubnetType; use ic_test_utilities_types::ids::canister_test_id; use ic_test_utilities_types::messages::RequestBuilder; -use ic_types::messages::RequestOrResponse; -use ic_types_cycles::Cycles; +use ic_types::messages::{CallContextId, NO_DEADLINE, RequestOrResponse}; +use ic_types::methods::{Callback, WasmClosure}; +use ic_types::time::CoarseTime; +use ic_types::{ComputeAllocation, NumBytes}; +use ic_types_cycles::{CanisterCyclesCostSchedule, CompoundCycles, Cycles}; use std::sync::Arc; fn make_canister(id: u64) -> Arc { @@ -124,7 +127,7 @@ fn insert_classifies_on_the_fly() { } #[test] -fn insert_replaces_existing_canister() { +fn insert_replaces_existing_canister_and_updates_cold_stats() { let mut states = CanisterStates::default(); let cold = cold_canister(1); @@ -132,21 +135,24 @@ fn insert_replaces_existing_canister() { assert!(states.insert(Arc::clone(&cold)).is_none()); assert_eq!(states.hot.len(), 0); assert_eq!(states.cold.len(), 1); + assert_eq!(states.cold_stats.count, 1); // Replace with a hot version of the same canister: returns the cold one, - // partition is updated. + // `cold_stats` reflects the new (empty) cold pool. let hot = hot_canister(1); let prev = states.insert(Arc::clone(&hot)).expect("upsert"); assert!(Arc::ptr_eq(&prev, &cold)); assert_eq!(states.hot.len(), 1); assert_eq!(states.cold.len(), 0); + assert_eq!(states.cold_stats.count, 0); - // Replace again with a cold version: partition flips back. + // Replace again with a cold version: cold_stats picks it back up. let cold_again = cold_canister(1); let prev = states.insert(Arc::clone(&cold_again)).expect("upsert"); assert!(Arc::ptr_eq(&prev, &hot)); assert_eq!(states.hot.len(), 0); assert_eq!(states.cold.len(), 1); + assert_eq!(states.cold_stats.count, 1); } #[test] @@ -164,7 +170,7 @@ fn get_mut_heats_a_cold_canister() { } #[test] -fn remove_takes_canister_from_either_pool() { +fn remove_updates_cold_stats() { let mut states = CanisterStates::default(); let canister = cold_canister(1); states.insert(Arc::clone(&canister)); @@ -266,8 +272,11 @@ fn for_each_mut_visits_every_canister_and_repartitions() { fn for_each_mut_demotes_a_hot_canister_that_became_cold() { let mut states = CanisterStates::default(); - // c1: hot (has an input message). - let c1 = hot_canister(1); + // c1: hot (has an input message). Tag it with a distinguishable compute + // allocation so that we can verify it lands in `cold_stats` after demotion. + let mut c1 = hot_canister(1); + Arc::make_mut(&mut c1).system_state.compute_allocation = + ComputeAllocation::try_from(42).unwrap(); // c2: already cold, kept untouched as a partition witness. let c2 = cold_canister(2); @@ -276,6 +285,10 @@ fn for_each_mut_demotes_a_hot_canister_that_became_cold() { assert_eq!(states.hot.len(), 1); assert_eq!(states.cold.len(), 1); + // c1 is in hot, so its 42% compute allocation is not yet in `cold_stats`. + assert_eq!(states.cold_stats.total_compute_allocation_percent, 0); + assert_eq!(states.total_compute_allocation(), 42); + // Drain c1's input queue inside the closure: this flips c1 from non-cold // back to cold. `for_each_mut` must demote it as part of its final // `try_cool_all` pass. @@ -290,9 +303,32 @@ fn for_each_mut_demotes_a_hot_canister_that_became_cold() { }); assert_eq!(visited, vec![c1.canister_id(), c2.canister_id()]); - // Both canisters end up in cold. + // Both canisters end up in cold; cold_stats picks up c1's 42% compute. assert_eq!(states.hot.len(), 0); assert_eq!(states.cold.len(), 2); + assert_eq!(states.cold_stats.total_compute_allocation_percent, 42); + assert_eq!(states.total_compute_allocation(), 42); +} + +#[test] +fn for_each_mut_updates_cold_stats_in_place() { + let mut states = CanisterStates::default(); + let c = cold_canister(1); + states.insert(c); + // Sanity: c is cold and contributes 0% compute allocation. + assert_eq!(states.cold.len(), 1); + assert_eq!(states.total_compute_allocation(), 0); + + // Mutate `compute_allocation` on the cold canister without affecting + // `is_cold()`. The in-place iteration must update `cold_stats` accordingly. + states.for_each_mut(|_id, canister| { + Arc::make_mut(canister).system_state.compute_allocation = + ComputeAllocation::try_from(42).unwrap(); + }); + + assert_eq!(states.hot.len(), 0); + assert_eq!(states.cold.len(), 1); + assert_eq!(states.total_compute_allocation(), 42); } #[test] @@ -378,7 +414,7 @@ fn try_for_each_mut_short_circuits_on_cold_error_and_promotes_visited() { Ok(()) } else if *id == c2.canister_id() { // Mutate AND return Err on the same canister: c2's mutation must - // still be reflected in the partition. + // still be reflected in the partition / cold_stats. push_input(canister); Err("boom") } else { @@ -390,25 +426,34 @@ fn try_for_each_mut_short_circuits_on_cold_error_and_promotes_visited() { // Iteration stopped at c2; c3 never visited. assert_eq!(visited, vec![c1.canister_id(), c2.canister_id()]); // c1 and c2 both became non-cold during iteration: both end up in hot. - // c3 remains in cold. + // c3 remains in cold. `debug_assert_invariants` (run inside + // `try_for_each_mut`) covers the matching `cold_stats` invariant. assert!(states.hot.contains_key(&c1.canister_id())); assert!(states.hot.contains_key(&c2.canister_id())); assert!(states.cold.contains_key(&c3.canister_id())); } #[test] -fn retain_drops_canisters_from_both_pools() { +fn retain_updates_cold_stats_for_removed_cold_canisters() { let mut states = CanisterStates::default(); - let c1 = cold_canister(1); - let c2 = cold_canister(2); - let c3 = hot_canister(3); - let c4 = hot_canister(4); + let mut c1 = cold_canister(1); + let mut c2 = cold_canister(2); + let mut c3 = hot_canister(3); + let mut c4 = hot_canister(4); + // Give every canister a distinguishable compute allocation so we can spot + // bookkeeping errors via `total_compute_allocation()`. + for (i, canister) in [&mut c1, &mut c2, &mut c3, &mut c4].into_iter().enumerate() { + Arc::make_mut(canister).system_state.compute_allocation = + ComputeAllocation::try_from(10 + 10 * i as u64).unwrap(); + } states.insert(Arc::clone(&c1)); states.insert(Arc::clone(&c2)); states.insert(Arc::clone(&c3)); states.insert(Arc::clone(&c4)); assert_eq!(states.hot.len(), 2); assert_eq!(states.cold.len(), 2); + // 10% + 20% + 30% + 40% = 100%. + assert_eq!(states.total_compute_allocation(), 100); let keep = [c1.canister_id(), c3.canister_id()]; states.retain(|id, _| keep.contains(id)); @@ -419,6 +464,133 @@ fn retain_drops_canisters_from_both_pools() { assert!(!states.contains_key(&c2.canister_id())); assert!(states.contains_key(&c3.canister_id())); assert!(!states.contains_key(&c4.canister_id())); + // 10% (c1) + 30% (c3) = 40%. + assert_eq!(states.total_compute_allocation(), 40); +} + +#[test] +fn memory_aggregators_combine_hot_and_cold() { + let mut c1 = cold_canister(1); + let mut c2 = hot_canister(2); + // Distinguishable memory allocation reservations so that `execution` + // depends visibly on both canisters. + Arc::make_mut(&mut c1).system_state.memory_allocation = NumBytes::new(10_000_000).into(); + Arc::make_mut(&mut c2).system_state.memory_allocation = NumBytes::new(20_000_000).into(); + + let mut states = CanisterStates::default(); + states.insert(Arc::clone(&c1)); + states.insert(Arc::clone(&c2)); + // Both pools must be exercised, otherwise this test does not cover the + // hot-iteration + cold-aggregate combination. + assert_eq!(states.hot.len(), 1); + assert_eq!(states.cold.len(), 1); + + // Expected values, computed by summing over every canister independently + // of the hot/cold split. + let canisters = [&c1, &c2]; + let expected_memory_usage: NumBytes = canisters + .iter() + .map(|c| c.memory_usage() + c.message_memory_usage().total()) + .sum(); + let expected_guaranteed: NumBytes = canisters + .iter() + .map(|c| c.system_state.guaranteed_response_message_memory_usage()) + .sum(); + let expected_best_effort: NumBytes = canisters + .iter() + .map(|c| c.system_state.best_effort_message_memory_usage()) + .sum(); + let expected_execution: NumBytes = canisters.iter().map(|c| c.memory_allocated_bytes()).sum(); + let expected_wasm_sections: NumBytes = canisters + .iter() + .map(|c| c.wasm_custom_sections_memory_usage()) + .sum(); + let expected_history: NumBytes = canisters + .iter() + .map(|c| c.canister_history_memory_usage()) + .sum(); + + // Sanity: the hot canister contributes non-zero guaranteed-response message + // memory (from `push_input`) and both canisters contribute to `execution` + // (from `memory_allocation`); otherwise a broken aggregator returning 0 could + // trivially pass. + assert!(expected_guaranteed > NumBytes::new(0)); + assert!(expected_execution == NumBytes::new(30_000_000)); + // Also ensure that message memory usage contributes a non-zero amount to + // `total_canister_memory_usage`. + let total_message_memory: NumBytes = canisters + .iter() + .map(|c| c.message_memory_usage().total()) + .sum(); + assert!(total_message_memory > NumBytes::new(0)); + // The actual `total_canister_memory_usage` only includes message memory + // (canisters have no execution state). + assert_eq!(expected_memory_usage, total_message_memory); + + assert_eq!(states.total_canister_memory_usage(), expected_memory_usage); + assert_eq!( + states.guaranteed_response_message_memory_taken(), + expected_guaranteed + ); + assert_eq!( + states.best_effort_message_memory_taken(), + expected_best_effort + ); + + let mt = states.memory_taken(); + assert_eq!(mt.execution, expected_execution); + assert_eq!(mt.guaranteed_response_messages, expected_guaranteed); + assert_eq!(mt.best_effort_messages, expected_best_effort); + assert_eq!(mt.wasm_custom_sections, expected_wasm_sections); + assert_eq!(mt.canister_history, expected_history); +} + +#[test] +fn callback_count_combines_hot_and_cold() { + fn make_callback(deadline: CoarseTime) -> Callback { + Callback::new( + CallContextId::from(1), + canister_test_id(999), + Cycles::zero(), + CompoundCycles::new(Cycles::zero(), CanisterCyclesCostSchedule::Normal), + CompoundCycles::new(Cycles::zero(), CanisterCyclesCostSchedule::Normal), + CompoundCycles::new(Cycles::zero(), CanisterCyclesCostSchedule::Normal), + WasmClosure::new(0, 0), + WasmClosure::new(0, 0), + None, + deadline, + ) + } + + // A canister with only guaranteed-response (never-expiring) callbacks is + // still cold by definition (see `CanisterState::is_cold`), which lets us + // exercise the `cold_stats.callback_count` aggregate. + let mut cold = cold_canister(1); + Arc::make_mut(&mut cold) + .system_state + .register_callback(make_callback(NO_DEADLINE)) + .unwrap(); + assert!(cold.is_cold()); + + // A hot canister with two callbacks (one guaranteed, one best-effort). + let mut hot = hot_canister(2); + Arc::make_mut(&mut hot) + .system_state + .register_callback(make_callback(NO_DEADLINE)) + .unwrap(); + Arc::make_mut(&mut hot) + .system_state + .register_callback(make_callback(CoarseTime::from_secs_since_unix_epoch(1))) + .unwrap(); + assert!(!hot.is_cold()); + + let mut states = CanisterStates::default(); + states.insert(cold); + states.insert(hot); + + // Total = 1 (cold contribution, via cold_stats) + 2 (hot, via iteration). + assert_eq!(states.cold_stats.callback_count, 1); + assert_eq!(states.callback_count(), 3); } #[test] @@ -439,7 +611,7 @@ fn validate_strict_split_rejects_stale_hot_canister() { states.insert(Arc::clone(&c)); // `get_mut` promotes c to hot without actually mutating anything, so c // ends up cold-by-predicate but in the `hot` pool — exactly the stale - // state that `try_cool_all` is supposed to clean up. + // state that `try_cool_all` / `repartition` are supposed to clean up. let _ = states.get_mut(&c.canister_id()); assert_eq!(states.hot.len(), 1); assert_eq!(states.cold.len(), 0); diff --git a/rs/replicated_state/src/replicated_state.rs b/rs/replicated_state/src/replicated_state.rs index ebee68d7506b..8282410fd9ec 100644 --- a/rs/replicated_state/src/replicated_state.rs +++ b/rs/replicated_state/src/replicated_state.rs @@ -364,15 +364,15 @@ pub struct MemoryTaken { /// specified and the actual canister memory usage (including /// Wasm custom sections) where no explicit memory reservation /// has been made. - execution: NumBytes, + pub(crate) execution: NumBytes, /// Memory taken by guaranteed response canister messages or reservations. - guaranteed_response_messages: NumBytes, + pub(crate) guaranteed_response_messages: NumBytes, /// Memory taken by best-effort canister messages. - best_effort_messages: NumBytes, + pub(crate) best_effort_messages: NumBytes, /// Memory taken by Wasm Custom Sections. - wasm_custom_sections: NumBytes, + pub(crate) wasm_custom_sections: NumBytes, /// Memory taken by canister history. - canister_history: NumBytes, + pub(crate) canister_history: NumBytes, } impl MemoryTaken { From 893db2aea3ed7d31b8056912acd8e84e69848bec Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Fri, 22 May 2026 09:14:15 +0000 Subject: [PATCH 2/4] Drop redundant count and always-zero guaranteed_response_message_memory. Rename raw_memory to execution_memory, so it better matches the equivalent MemoryTaken field. Update documentation and tests. --- rs/replicated_state/src/canister_states.rs | 44 +++++++------------ .../src/canister_states/tests.rs | 29 +++++++----- 2 files changed, 32 insertions(+), 41 deletions(-) diff --git a/rs/replicated_state/src/canister_states.rs b/rs/replicated_state/src/canister_states.rs index e7cba03c7aaa..eb0244ac1e51 100644 --- a/rs/replicated_state/src/canister_states.rs +++ b/rs/replicated_state/src/canister_states.rs @@ -56,18 +56,14 @@ mod tests; /// aggregate with an `O(|hot|)` pass over hot canisters. #[derive(Clone, PartialEq, Eq, Debug, Default)] struct ColdStats { - /// Number of canisters in `cold`. Equal to `cold.len()`. - count: usize, /// Sum of `ComputeAllocation::as_percent()` across cold canisters. total_compute_allocation_percent: u64, - /// Sum of `memory_allocation().allocated_bytes(memory_usage())`. - raw_memory: NumBytes, - /// Sum of `memory_usage()`. + /// Sum of `memory_allocation().allocated_bytes(memory_usage())` (maximum of + /// memory allocation and memory usage). + execution_memory: NumBytes, + /// Sum of `memory_usage()` (actual execution memory usage, ignoring memory + /// allocation). memory_usage: NumBytes, - /// Sum of `system_state.guaranteed_response_message_memory_usage()`. Always - /// `0` while invariants hold (cold canisters have empty queues), but - /// tracked for symmetry / debug-time invariant checking. - guaranteed_response_message_memory: NumBytes, /// Sum of `wasm_custom_sections_memory_usage()`. wasm_custom_sections_memory: NumBytes, /// Sum of `canister_history_memory_usage()`. @@ -81,14 +77,10 @@ struct ColdStats { impl ColdStats { /// Adds the contribution of `canister` to the aggregates. fn add(&mut self, canister: &CanisterState) { - self.count += 1; self.total_compute_allocation_percent += canister.compute_allocation().as_percent(); let memory_usage = canister.memory_usage(); - self.raw_memory += canister.memory_allocation().allocated_bytes(memory_usage); + self.execution_memory += canister.memory_allocation().allocated_bytes(memory_usage); self.memory_usage += memory_usage; - self.guaranteed_response_message_memory += canister - .system_state - .guaranteed_response_message_memory_usage(); self.wasm_custom_sections_memory += canister.wasm_custom_sections_memory_usage(); self.canister_history_memory += canister.canister_history_memory_usage(); self.callback_count += canister @@ -99,14 +91,10 @@ impl ColdStats { /// Subtracts the contribution of `canister` from the aggregates. fn sub(&mut self, canister: &CanisterState) { - self.count -= 1; self.total_compute_allocation_percent -= canister.compute_allocation().as_percent(); let memory_usage = canister.memory_usage(); - self.raw_memory -= canister.memory_allocation().allocated_bytes(memory_usage); + self.execution_memory -= canister.memory_allocation().allocated_bytes(memory_usage); self.memory_usage -= memory_usage; - self.guaranteed_response_message_memory -= canister - .system_state - .guaranteed_response_message_memory_usage(); self.wasm_custom_sections_memory -= canister.wasm_custom_sections_memory_usage(); self.canister_history_memory -= canister.canister_history_memory_usage(); self.callback_count -= canister @@ -543,18 +531,17 @@ impl CanisterStates { /// reservations) across every canister in either pool. Does **not** /// include subnet queues. /// - /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + /// `O(|hot canisters|)`. — cold canisters by definition use no + /// guaranteed-response message memory (see `CanisterState::is_cold`). pub fn guaranteed_response_message_memory_taken(&self) -> NumBytes { - let hot: NumBytes = self - .hot + self.hot .values() .map(|canister| { canister .system_state .guaranteed_response_message_memory_usage() }) - .sum(); - hot + self.cold_stats.guaranteed_response_message_memory + .sum() } /// Returns the total best-effort message memory across every canister in @@ -583,7 +570,7 @@ impl CanisterStates { pub(crate) fn memory_taken(&self) -> MemoryTaken { let ( mut execution, - mut guaranteed_response_messages, + guaranteed_response_messages, best_effort_messages, mut wasm_custom_sections, mut canister_history, @@ -613,10 +600,9 @@ impl CanisterStates { .unwrap_or_default(); let cold = &self.cold_stats; - execution += cold.raw_memory; - guaranteed_response_messages += cold.guaranteed_response_message_memory; - // `best_effort_messages` has no cold contribution: cold canisters - // have empty queues. + execution += cold.execution_memory; + // `guaranteed_response_messages` and `best_effort_messages` have no cold + // contribution: cold canisters have empty queues. wasm_custom_sections += cold.wasm_custom_sections_memory; canister_history += cold.canister_history_memory; diff --git a/rs/replicated_state/src/canister_states/tests.rs b/rs/replicated_state/src/canister_states/tests.rs index f130d18f6b5e..0691980dfc7a 100644 --- a/rs/replicated_state/src/canister_states/tests.rs +++ b/rs/replicated_state/src/canister_states/tests.rs @@ -45,6 +45,11 @@ fn push_input(canister: &mut Arc) { .unwrap(); } +fn set_compute_allocation(canister: &mut Arc, percentage: u64) { + Arc::make_mut(canister).system_state.compute_allocation = + ComputeAllocation::try_from(percentage).unwrap(); +} + fn hot_canister(id: u64) -> Arc { let mut canister = make_canister(id); push_input(&mut canister); @@ -129,30 +134,33 @@ fn insert_classifies_on_the_fly() { #[test] fn insert_replaces_existing_canister_and_updates_cold_stats() { let mut states = CanisterStates::default(); - let cold = cold_canister(1); // Initial insert: lands in cold. + let mut cold = cold_canister(1); + set_compute_allocation(&mut cold, 42); assert!(states.insert(Arc::clone(&cold)).is_none()); assert_eq!(states.hot.len(), 0); assert_eq!(states.cold.len(), 1); - assert_eq!(states.cold_stats.count, 1); + assert_eq!(states.cold_stats.total_compute_allocation_percent, 42); // Replace with a hot version of the same canister: returns the cold one, // `cold_stats` reflects the new (empty) cold pool. - let hot = hot_canister(1); + let mut hot = hot_canister(1); + set_compute_allocation(&mut hot, 42); let prev = states.insert(Arc::clone(&hot)).expect("upsert"); assert!(Arc::ptr_eq(&prev, &cold)); assert_eq!(states.hot.len(), 1); assert_eq!(states.cold.len(), 0); - assert_eq!(states.cold_stats.count, 0); + assert_eq!(states.cold_stats.total_compute_allocation_percent, 0); // Replace again with a cold version: cold_stats picks it back up. - let cold_again = cold_canister(1); + let mut cold_again = cold_canister(1); + set_compute_allocation(&mut cold_again, 42); let prev = states.insert(Arc::clone(&cold_again)).expect("upsert"); assert!(Arc::ptr_eq(&prev, &hot)); assert_eq!(states.hot.len(), 0); assert_eq!(states.cold.len(), 1); - assert_eq!(states.cold_stats.count, 1); + assert_eq!(states.cold_stats.total_compute_allocation_percent, 42); } #[test] @@ -275,8 +283,7 @@ fn for_each_mut_demotes_a_hot_canister_that_became_cold() { // c1: hot (has an input message). Tag it with a distinguishable compute // allocation so that we can verify it lands in `cold_stats` after demotion. let mut c1 = hot_canister(1); - Arc::make_mut(&mut c1).system_state.compute_allocation = - ComputeAllocation::try_from(42).unwrap(); + set_compute_allocation(&mut c1, 42); // c2: already cold, kept untouched as a partition witness. let c2 = cold_canister(2); @@ -322,8 +329,7 @@ fn for_each_mut_updates_cold_stats_in_place() { // Mutate `compute_allocation` on the cold canister without affecting // `is_cold()`. The in-place iteration must update `cold_stats` accordingly. states.for_each_mut(|_id, canister| { - Arc::make_mut(canister).system_state.compute_allocation = - ComputeAllocation::try_from(42).unwrap(); + set_compute_allocation(canister, 42); }); assert_eq!(states.hot.len(), 0); @@ -443,8 +449,7 @@ fn retain_updates_cold_stats_for_removed_cold_canisters() { // Give every canister a distinguishable compute allocation so we can spot // bookkeeping errors via `total_compute_allocation()`. for (i, canister) in [&mut c1, &mut c2, &mut c3, &mut c4].into_iter().enumerate() { - Arc::make_mut(canister).system_state.compute_allocation = - ComputeAllocation::try_from(10 + 10 * i as u64).unwrap(); + set_compute_allocation(canister, 10 + 10 * i as u64); } states.insert(Arc::clone(&c1)); states.insert(Arc::clone(&c2)); From 3b04b73293c21b25dcc894f8d5921bff3a49b264 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Fri, 22 May 2026 09:43:31 +0000 Subject: [PATCH 3/4] Restore `guaranteed_response_message_memory` in `ColdStats` A canister can satisfy `CanisterState::is_cold()` while still holding a guaranteed-response slot reservation: `is_cold()` only requires empty input/output *messages* (the pool count) and no unexpired best-effort callback, both of which are independent of whether the canister has in-flight guaranteed-response requests. A canister that has pushed a guaranteed-response request that's already been moved to an outgoing stream still keeps the input-slot reservation for the eventual response, which contributes `MAX_RESPONSE_COUNT_BYTES` to its `guaranteed_response_message_memory_usage()`. The previous commit dropped this field from `ColdStats` on the assumption it was always zero. It isn't, and the consequence is that `guaranteed_response_message_memory_taken()` quietly under-reports subnet-wide memory: promoting a cold canister with a reservation to `hot` (e.g. on the next `get_mut`) makes the subnet total jump up out of nowhere, breaking conservation invariants in downstream code (stream handler `debug_assert!`s, in particular). Restore the field and the corresponding `add`/`sub` bookkeeping, fold it into `guaranteed_response_message_memory_taken`, `total_canister_memory_usage`, and `memory_taken`, and add a focused test (`cold_canister_with_guaranteed_response_reservation_is_aggregated`) exercising the case via `push_output_request` followed by draining the output queue. Best-effort message memory remains hot-only: an unexpired best-effort callback forces the canister into `hot`, and any expired best-effort callback shows up as a pending input which also forces `hot`. Co-authored-by: Cursor --- rs/replicated_state/src/canister_states.rs | 47 +++++++++---- .../src/canister_states/tests.rs | 70 ++++++++++++++++++- 2 files changed, 104 insertions(+), 13 deletions(-) diff --git a/rs/replicated_state/src/canister_states.rs b/rs/replicated_state/src/canister_states.rs index eb0244ac1e51..97b764209a69 100644 --- a/rs/replicated_state/src/canister_states.rs +++ b/rs/replicated_state/src/canister_states.rs @@ -64,6 +64,13 @@ struct ColdStats { /// Sum of `memory_usage()` (actual execution memory usage, ignoring memory /// allocation). memory_usage: NumBytes, + /// Sum of `system_state.guaranteed_response_message_memory_usage()`. + /// + /// Cold canisters have no enqueued messages by `is_cold()`, but they can still + /// hold reservations for in-flight guaranteed-response requests, which + /// contribute to `guaranteed_response_message_memory_usage()`. We therefore + /// have to track this separately, instead of assuming it is zero. + guaranteed_response_message_memory: NumBytes, /// Sum of `wasm_custom_sections_memory_usage()`. wasm_custom_sections_memory: NumBytes, /// Sum of `canister_history_memory_usage()`. @@ -81,6 +88,9 @@ impl ColdStats { let memory_usage = canister.memory_usage(); self.execution_memory += canister.memory_allocation().allocated_bytes(memory_usage); self.memory_usage += memory_usage; + self.guaranteed_response_message_memory += canister + .system_state + .guaranteed_response_message_memory_usage(); self.wasm_custom_sections_memory += canister.wasm_custom_sections_memory_usage(); self.canister_history_memory += canister.canister_history_memory_usage(); self.callback_count += canister @@ -95,6 +105,9 @@ impl ColdStats { let memory_usage = canister.memory_usage(); self.execution_memory -= canister.memory_allocation().allocated_bytes(memory_usage); self.memory_usage -= memory_usage; + self.guaranteed_response_message_memory -= canister + .system_state + .guaranteed_response_message_memory_usage(); self.wasm_custom_sections_memory -= canister.wasm_custom_sections_memory_usage(); self.canister_history_memory -= canister.canister_history_memory_usage(); self.callback_count -= canister @@ -522,33 +535,40 @@ impl CanisterStates { .values() .map(|canister| canister.memory_usage() + canister.message_memory_usage().total()) .sum(); - // Cold canisters have empty queues by `CanisterState::is_cold`, so they - // contribute no message memory usage. - hot + self.cold_stats.memory_usage + // Cold canisters contribute their execution memory plus any + // guaranteed-response message memory (reservations). They never have + // best-effort message memory: an unexpired best-effort callback would + // force the canister into `hot`. + hot + self.cold_stats.memory_usage + self.cold_stats.guaranteed_response_message_memory } /// Returns the total guaranteed-response message memory (including /// reservations) across every canister in either pool. Does **not** /// include subnet queues. /// - /// `O(|hot canisters|)`. — cold canisters by definition use no - /// guaranteed-response message memory (see `CanisterState::is_cold`). + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. Cold + /// canisters can still hold input-slot reservations for in-flight requests, + /// so the cold contribution is not always zero. pub fn guaranteed_response_message_memory_taken(&self) -> NumBytes { - self.hot + let hot: NumBytes = self + .hot .values() .map(|canister| { canister .system_state .guaranteed_response_message_memory_usage() }) - .sum() + .sum(); + hot + self.cold_stats.guaranteed_response_message_memory } /// Returns the total best-effort message memory across every canister in /// either pool. Does **not** include subnet queues. /// - /// `O(|hot canisters|)` — cold canisters by definition use no best-effort - /// message memory (see `CanisterState::is_cold`). + /// `O(|hot canisters|)`: by `CanisterState::is_cold`, a canister with an + /// unexpired best-effort callback or with any pending input (including an + /// expired best-effort callback) is forced into `hot`, so the cold pool + /// contributes nothing here. pub fn best_effort_message_memory_taken(&self) -> NumBytes { self.hot .values() @@ -570,7 +590,7 @@ impl CanisterStates { pub(crate) fn memory_taken(&self) -> MemoryTaken { let ( mut execution, - guaranteed_response_messages, + mut guaranteed_response_messages, best_effort_messages, mut wasm_custom_sections, mut canister_history, @@ -601,8 +621,11 @@ impl CanisterStates { let cold = &self.cold_stats; execution += cold.execution_memory; - // `guaranteed_response_messages` and `best_effort_messages` have no cold - // contribution: cold canisters have empty queues. + guaranteed_response_messages += cold.guaranteed_response_message_memory; + // `best_effort_messages` has no cold contribution: an unexpired + // best-effort callback forces the canister into `hot`, and any expired + // best-effort callback shows up as a pending input, which also forces + // the canister into `hot`. wasm_custom_sections += cold.wasm_custom_sections_memory; canister_history += cold.canister_history_memory; diff --git a/rs/replicated_state/src/canister_states/tests.rs b/rs/replicated_state/src/canister_states/tests.rs index 0691980dfc7a..83868148dd9e 100644 --- a/rs/replicated_state/src/canister_states/tests.rs +++ b/rs/replicated_state/src/canister_states/tests.rs @@ -7,7 +7,7 @@ use ic_test_utilities_types::ids::canister_test_id; use ic_test_utilities_types::messages::RequestBuilder; use ic_types::messages::{CallContextId, NO_DEADLINE, RequestOrResponse}; use ic_types::methods::{Callback, WasmClosure}; -use ic_types::time::CoarseTime; +use ic_types::time::{CoarseTime, UNIX_EPOCH}; use ic_types::{ComputeAllocation, NumBytes}; use ic_types_cycles::{CanisterCyclesCostSchedule, CompoundCycles, Cycles}; use std::sync::Arc; @@ -60,6 +60,27 @@ fn cold_canister(id: u64) -> Arc { make_canister(id) } +/// Builds a canister that satisfies [`CanisterState::is_cold`] but holds an +/// input-slot reservation for an outstanding guaranteed-response request. Such +/// canisters contribute to `cold_stats.guaranteed_response_message_memory`. +fn cold_canister_with_guaranteed_response_reservation(id: u64) -> Arc { + let mut canister = make_canister(id); + // Push an output request, then drain the output queue. The request itself + // leaves the queue but the reservation made in the input queue for the + // eventual response stays behind. This matches what `routing` does when it + // moves an outgoing request from a canister's output queue into a stream. + let request = ic_test_utilities_types::messages::RequestBuilder::default() + .sender(canister.canister_id()) + .receiver(canister_test_id(999)) + .build(); + Arc::make_mut(&mut canister) + .push_output_request(Arc::new(request), UNIX_EPOCH) + .unwrap(); + // Drain the output queue to leave only the reservation behind. + let _ = Arc::make_mut(&mut canister).output_into_iter().count(); + canister +} + #[test] fn fresh_canister_is_cold() { let canister = cold_canister(1); @@ -550,6 +571,53 @@ fn memory_aggregators_combine_hot_and_cold() { assert_eq!(mt.canister_history, expected_history); } +/// A cold canister holding a guaranteed-response slot reservation must still +/// contribute its reservation memory to +/// `guaranteed_response_message_memory_taken` and `memory_taken`. Otherwise +/// promoting that canister to `hot` (e.g. on the next `get_mut`) would make +/// the subnet-wide aggregate jump up out of nowhere, breaking conservation +/// invariants in downstream code (e.g. the stream handler). +#[test] +fn cold_canister_with_guaranteed_response_reservation_is_aggregated() { + let canister = cold_canister_with_guaranteed_response_reservation(1); + // Sanity: the test fixture is what it claims to be. + assert!(canister.is_cold(), "fixture is not cold"); + let canister_guaranteed = canister + .system_state + .guaranteed_response_message_memory_usage(); + assert!( + canister_guaranteed > NumBytes::new(0), + "fixture has no guaranteed-response reservation", + ); + + let mut states = CanisterStates::default(); + states.insert(Arc::clone(&canister)); + assert_eq!(states.hot.len(), 0); + assert_eq!(states.cold.len(), 1); + + // The canister sits in `cold` but contributes a non-zero amount to both + // `guaranteed_response_message_memory_taken` and `memory_taken`. + assert_eq!( + states.guaranteed_response_message_memory_taken(), + canister_guaranteed, + ); + assert_eq!( + states.memory_taken().guaranteed_response_messages, + canister_guaranteed, + ); + + // Promoting the canister to `hot` (e.g. via `get_mut`) must not change the + // aggregate. + let _ = states.get_mut(&canister.canister_id()); + assert_eq!(states.hot.len(), 1); + assert_eq!(states.cold.len(), 0); + assert_eq!( + states.guaranteed_response_message_memory_taken(), + canister_guaranteed, + "aggregate must be conserved across hot/cold promotion", + ); +} + #[test] fn callback_count_combines_hot_and_cold() { fn make_callback(deadline: CoarseTime) -> Callback { From d90e0ca1a1245e41a7025c10a05501eadce719aa Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Fri, 22 May 2026 10:39:35 +0000 Subject: [PATCH 4/4] Cut down on the verbosity of AI-generated comments. --- rs/replicated_state/src/canister_states.rs | 25 ++++++------------ .../src/canister_states/tests.rs | 26 ++++++------------- 2 files changed, 16 insertions(+), 35 deletions(-) diff --git a/rs/replicated_state/src/canister_states.rs b/rs/replicated_state/src/canister_states.rs index 97b764209a69..c0faa32d711c 100644 --- a/rs/replicated_state/src/canister_states.rs +++ b/rs/replicated_state/src/canister_states.rs @@ -67,9 +67,8 @@ struct ColdStats { /// Sum of `system_state.guaranteed_response_message_memory_usage()`. /// /// Cold canisters have no enqueued messages by `is_cold()`, but they can still - /// hold reservations for in-flight guaranteed-response requests, which - /// contribute to `guaranteed_response_message_memory_usage()`. We therefore - /// have to track this separately, instead of assuming it is zero. + /// hold guaranteed-response reservations, contributing to guaranteed-response + /// message memory usage. guaranteed_response_message_memory: NumBytes, /// Sum of `wasm_custom_sections_memory_usage()`. wasm_custom_sections_memory: NumBytes, @@ -536,9 +535,7 @@ impl CanisterStates { .map(|canister| canister.memory_usage() + canister.message_memory_usage().total()) .sum(); // Cold canisters contribute their execution memory plus any - // guaranteed-response message memory (reservations). They never have - // best-effort message memory: an unexpired best-effort callback would - // force the canister into `hot`. + // guaranteed-response message memory (reservations). hot + self.cold_stats.memory_usage + self.cold_stats.guaranteed_response_message_memory } @@ -546,9 +543,7 @@ impl CanisterStates { /// reservations) across every canister in either pool. Does **not** /// include subnet queues. /// - /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. Cold - /// canisters can still hold input-slot reservations for in-flight requests, - /// so the cold contribution is not always zero. + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. pub fn guaranteed_response_message_memory_taken(&self) -> NumBytes { let hot: NumBytes = self .hot @@ -565,10 +560,8 @@ impl CanisterStates { /// Returns the total best-effort message memory across every canister in /// either pool. Does **not** include subnet queues. /// - /// `O(|hot canisters|)`: by `CanisterState::is_cold`, a canister with an - /// unexpired best-effort callback or with any pending input (including an - /// expired best-effort callback) is forced into `hot`, so the cold pool - /// contributes nothing here. + /// `O(|hot canisters|)` — cold canisters by definition use no best-effort + /// message memory (see `CanisterState::is_cold`). pub fn best_effort_message_memory_taken(&self) -> NumBytes { self.hot .values() @@ -622,10 +615,8 @@ impl CanisterStates { let cold = &self.cold_stats; execution += cold.execution_memory; guaranteed_response_messages += cold.guaranteed_response_message_memory; - // `best_effort_messages` has no cold contribution: an unexpired - // best-effort callback forces the canister into `hot`, and any expired - // best-effort callback shows up as a pending input, which also forces - // the canister into `hot`. + // `best_effort_messages` has no cold contribution: cold canisters have no + // messages in their queues. wasm_custom_sections += cold.wasm_custom_sections_memory; canister_history += cold.canister_history_memory; diff --git a/rs/replicated_state/src/canister_states/tests.rs b/rs/replicated_state/src/canister_states/tests.rs index 83868148dd9e..1c1c09891af5 100644 --- a/rs/replicated_state/src/canister_states/tests.rs +++ b/rs/replicated_state/src/canister_states/tests.rs @@ -65,10 +65,7 @@ fn cold_canister(id: u64) -> Arc { /// canisters contribute to `cold_stats.guaranteed_response_message_memory`. fn cold_canister_with_guaranteed_response_reservation(id: u64) -> Arc { let mut canister = make_canister(id); - // Push an output request, then drain the output queue. The request itself - // leaves the queue but the reservation made in the input queue for the - // eventual response stays behind. This matches what `routing` does when it - // moves an outgoing request from a canister's output queue into a stream. + // Push an output request, making an input queue slot reservation. let request = ic_test_utilities_types::messages::RequestBuilder::default() .sender(canister.canister_id()) .receiver(canister_test_id(999)) @@ -573,22 +570,16 @@ fn memory_aggregators_combine_hot_and_cold() { /// A cold canister holding a guaranteed-response slot reservation must still /// contribute its reservation memory to -/// `guaranteed_response_message_memory_taken` and `memory_taken`. Otherwise -/// promoting that canister to `hot` (e.g. on the next `get_mut`) would make -/// the subnet-wide aggregate jump up out of nowhere, breaking conservation -/// invariants in downstream code (e.g. the stream handler). +/// `guaranteed_response_message_memory_taken` and `memory_taken`. #[test] fn cold_canister_with_guaranteed_response_reservation_is_aggregated() { let canister = cold_canister_with_guaranteed_response_reservation(1); // Sanity: the test fixture is what it claims to be. - assert!(canister.is_cold(), "fixture is not cold"); - let canister_guaranteed = canister + assert!(canister.is_cold()); + let guaranteed_response_message_memory = canister .system_state .guaranteed_response_message_memory_usage(); - assert!( - canister_guaranteed > NumBytes::new(0), - "fixture has no guaranteed-response reservation", - ); + assert!(guaranteed_response_message_memory > NumBytes::new(0),); let mut states = CanisterStates::default(); states.insert(Arc::clone(&canister)); @@ -599,11 +590,11 @@ fn cold_canister_with_guaranteed_response_reservation_is_aggregated() { // `guaranteed_response_message_memory_taken` and `memory_taken`. assert_eq!( states.guaranteed_response_message_memory_taken(), - canister_guaranteed, + guaranteed_response_message_memory, ); assert_eq!( states.memory_taken().guaranteed_response_messages, - canister_guaranteed, + guaranteed_response_message_memory, ); // Promoting the canister to `hot` (e.g. via `get_mut`) must not change the @@ -613,8 +604,7 @@ fn cold_canister_with_guaranteed_response_reservation_is_aggregated() { assert_eq!(states.cold.len(), 0); assert_eq!( states.guaranteed_response_message_memory_taken(), - canister_guaranteed, - "aggregate must be conserved across hot/cold promotion", + guaranteed_response_message_memory, ); }