diff --git a/rs/replicated_state/src/canister_states.rs b/rs/replicated_state/src/canister_states.rs index 1113df379c64..c0faa32d711c 100644 --- a/rs/replicated_state/src/canister_states.rs +++ b/rs/replicated_state/src/canister_states.rs @@ -14,12 +14,17 @@ //! [`CanisterStates::try_cool`] for single canisters or //! [`CanisterStates::try_cool_all`] for a bulk pass. //! -//! This module currently only provides the partitioning machinery. The -//! `ReplicatedState` integration and the `O(1)` cold-pool aggregates are -//! introduced in follow-up changes; right now `CanisterStates` is not yet -//! wired into `ReplicatedState`. +//! Internally, `CanisterStates` also maintains `ColdStats`, a small set of +//! aggregates over the cold pool that lets several aggregated queries (e.g. +//! [`CanisterStates::total_compute_allocation`], +//! [`CanisterStates::total_canister_memory_usage`], +//! [`CanisterStates::callback_count`]) become `O(|hot|)` instead of +//! `O(|all canisters|)`. These aggregates are an implementation detail: callers +//! always go through the public aggregator methods. use crate::CanisterState; +use crate::replicated_state::MemoryTaken; +use ic_base_types::NumBytes; use ic_types::CanisterId; use ic_validate_eq::ValidateEq; use ic_validate_eq_derive::ValidateEq; @@ -31,6 +36,100 @@ use std::sync::Arc; #[cfg(test)] mod tests; +/// O(1) aggregates over the canisters currently in the `cold` pool. +/// +/// Maintained incrementally: every transition into / out of `cold` adds or +/// subtracts the contributing canister's values. The aggregates here are +/// expected to be a small enough set that the bookkeeping cost on each +/// transition is constant time and the resulting cost savings on the +/// `O(|all canisters|)` aggregate computations in `ReplicatedState` are +/// significant. +/// +/// Crucially, `ColdStats` is *derived* state: it is recomputable from the +/// `cold` map at any time and is **not** persisted in checkpoints. It is +/// reconstructed when canisters are inserted (e.g. on checkpoint load). +/// +/// Private to the module: external callers reach the same totals through +/// [`CanisterStates::total_compute_allocation`], +/// [`CanisterStates::total_canister_memory_usage`] and +/// [`CanisterStates::callback_count`], which transparently combine the cold +/// aggregate with an `O(|hot|)` pass over hot canisters. +#[derive(Clone, PartialEq, Eq, Debug, Default)] +struct ColdStats { + /// Sum of `ComputeAllocation::as_percent()` across cold canisters. + total_compute_allocation_percent: u64, + /// Sum of `memory_allocation().allocated_bytes(memory_usage())` (maximum of + /// memory allocation and memory usage). + execution_memory: NumBytes, + /// Sum of `memory_usage()` (actual execution memory usage, ignoring memory + /// allocation). + memory_usage: NumBytes, + /// Sum of `system_state.guaranteed_response_message_memory_usage()`. + /// + /// Cold canisters have no enqueued messages by `is_cold()`, but they can still + /// hold guaranteed-response reservations, contributing to guaranteed-response + /// message memory usage. + guaranteed_response_message_memory: NumBytes, + /// Sum of `wasm_custom_sections_memory_usage()`. + wasm_custom_sections_memory: NumBytes, + /// Sum of `canister_history_memory_usage()`. + canister_history_memory: NumBytes, + /// Sum of `ccm.unresponded_callback_count()` (which for cold canisters + /// equals the number of guaranteed-response callbacks; best-effort + /// callbacks force the canister into `hot`). + callback_count: usize, +} + +impl ColdStats { + /// Adds the contribution of `canister` to the aggregates. + fn add(&mut self, canister: &CanisterState) { + self.total_compute_allocation_percent += canister.compute_allocation().as_percent(); + let memory_usage = canister.memory_usage(); + self.execution_memory += canister.memory_allocation().allocated_bytes(memory_usage); + self.memory_usage += memory_usage; + self.guaranteed_response_message_memory += canister + .system_state + .guaranteed_response_message_memory_usage(); + self.wasm_custom_sections_memory += canister.wasm_custom_sections_memory_usage(); + self.canister_history_memory += canister.canister_history_memory_usage(); + self.callback_count += canister + .system_state + .call_context_manager() + .map_or(0, |ccm| ccm.unresponded_callback_count()); + } + + /// Subtracts the contribution of `canister` from the aggregates. + fn sub(&mut self, canister: &CanisterState) { + self.total_compute_allocation_percent -= canister.compute_allocation().as_percent(); + let memory_usage = canister.memory_usage(); + self.execution_memory -= canister.memory_allocation().allocated_bytes(memory_usage); + self.memory_usage -= memory_usage; + self.guaranteed_response_message_memory -= canister + .system_state + .guaranteed_response_message_memory_usage(); + self.wasm_custom_sections_memory -= canister.wasm_custom_sections_memory_usage(); + self.canister_history_memory -= canister.canister_history_memory_usage(); + self.callback_count -= canister + .system_state + .call_context_manager() + .map_or(0, |ccm| ccm.unresponded_callback_count()); + } + + /// Computes `ColdStats` from scratch over the provided cold canisters. + /// Used to (re-)derive the aggregates, e.g. after loading from checkpoint + /// and as a `debug_assert!` sanity check inside `debug_assert_invariants`. + fn recompute<'a, I>(cold: I) -> Self + where + I: IntoIterator>, + { + let mut stats = ColdStats::default(); + for c in cold { + stats.add(c.as_ref()); + } + stats + } +} + /// Hot/cold-partitioned collection of canister states. /// /// See the module-level docs for the overall design. The two underlying @@ -39,8 +138,8 @@ mod tests; /// a flat `BTreeMap>` would. /// /// `PartialEq` and `ValidateEq` are derived: two `CanisterStates` are equal iff -/// they have the same partition (hot vs. cold). This makes the partition -/// observable through equality assertions in tests. +/// they have the same partition (hot vs. cold) and the same `cold_stats`. This +/// makes the partition observable through equality assertions in tests. /// /// # Invariants /// @@ -48,13 +147,16 @@ mod tests; /// checked in debug builds by `debug_assert_invariants`: /// /// 1. `hot` and `cold` pools are disjoint (no canister ID in both); -/// 2. every canister in the `cold` pool satisfies `CanisterState::is_cold()`. +/// 2. every canister in the `cold` pool satisfies `CanisterState::is_cold()`; +/// 3. `cold_stats` matches a fresh recomputation over the `cold` pool. /// /// Additionally, the **strict** partition invariant — that every canister in /// the `hot` pool does *not* satisfy `is_cold()` — holds after -/// [`Self::try_cool_all`] and is verified by [`Self::validate_strict_split`]. -/// Between repartitioning passes the `hot` pool may contain canisters that -/// have gone quiet but have not yet been demoted; this is by design. +/// [`Self::try_cool_all`] / +/// [`crate::ReplicatedState::repartition_canister_states`], and is verified +/// during checkpoint validation by [`Self::validate_strict_split`]. Between +/// repartitioning passes the `hot` pool may contain canisters that have gone +/// quiet but have not yet been demoted; this is by design. #[derive(Clone, Debug, Default, PartialEq, ValidateEq)] pub struct CanisterStates { /// Canisters that may have round-level work or are recently active. Always @@ -68,6 +170,9 @@ pub struct CanisterStates { /// need to consider them; per-round operations should skip them. #[validate_eq(CompareWithValidateEq)] cold: BTreeMap>, + + /// O(1) aggregates over `cold` canisters. See [`ColdStats`]. + cold_stats: ColdStats, } impl CanisterStates { @@ -79,14 +184,20 @@ impl CanisterStates { pub fn new(canisters: BTreeMap>) -> Self { let mut hot = BTreeMap::new(); let mut cold = BTreeMap::new(); + let mut cold_stats = ColdStats::default(); for (id, canister) in canisters { if canister.is_cold() { + cold_stats.add(canister.as_ref()); cold.insert(id, canister); } else { hot.insert(id, canister); } } - let states = Self { hot, cold }; + let states = Self { + hot, + cold, + cold_stats, + }; states.debug_assert_invariants(); states } @@ -98,7 +209,8 @@ impl CanisterStates { } /// Returns a mutable reference to the `Arc` in `hot`. If the - /// canister is currently in `cold`, it is first promoted (moved to `hot`). + /// canister is currently in `cold`, it is first promoted (moved to `hot`, with + /// `cold_stats` updated accordingly). /// /// This is the back door used by all mutating accessors on `ReplicatedState`. /// Anyone holding the returned `&mut Arc` may freely mutate the @@ -115,7 +227,8 @@ impl CanisterStates { Entry::Vacant(entry) => { if let Some(canister) = self.cold.remove(id) { - // Was in the `cold` pool, promote it. + // Was in the `cold` pool, update the stats and promote it. + self.cold_stats.sub(canister.as_ref()); let canister = entry.insert(canister); // Unfortunately, the borrow checker won't let us do this here. // self.debug_assert_invariants(); @@ -144,15 +257,16 @@ impl CanisterStates { } /// Inserts a canister into the appropriate pool. If a canister with this - /// ID was already present (in either pool), it is replaced and returned. + /// ID was already present (in either pool), it is replaced and returned; + /// `cold_stats` is adjusted accordingly. pub fn insert(&mut self, canister: Arc) -> Option> { + // Drop any previous entry first so `cold_stats` doesn't double-count + // when we transition from / to the cold pool. let id = canister.canister_id(); - // Drop any previous entry first so that the partition reflects the - // freshly-inserted canister's `is_cold()` regardless of where the - // old entry lived. let prev = self.remove(&id); if canister.is_cold() { + self.cold_stats.add(canister.as_ref()); self.cold.insert(id, canister); } else { self.hot.insert(id, canister); @@ -162,22 +276,30 @@ impl CanisterStates { } /// Removes and returns the canister with the given ID from whichever pool - /// it is in. + /// it is in. Updates `cold_stats` if the canister was in `cold`. pub fn remove(&mut self, id: &CanisterId) -> Option> { - let removed = self.hot.remove(id).or_else(|| self.cold.remove(id)); + let removed = if let Some(canister) = self.hot.remove(id) { + Some(canister) + } else if let Some(canister) = self.cold.remove(id) { + self.cold_stats.sub(canister.as_ref()); + Some(canister) + } else { + None + }; self.debug_assert_invariants(); removed } /// Re-evaluates `is_cold()` for the given canister and, if true, moves the - /// canister from `hot` to `cold`. No-op if the canister is not present, - /// already in `cold`, or not cold. + /// canister from `hot` to `cold`, updating `cold_stats`. No-op if the canister + /// is not present, already in `cold`, or not cold. /// /// Returns true iff a transition (hot → cold) actually happened. pub fn try_cool(&mut self, id: &CanisterId) -> bool { let cooled = match self.hot.entry(*id) { Entry::Occupied(entry) if entry.get().is_cold() => { let canister = entry.remove(); + self.cold_stats.add(canister.as_ref()); self.cold.insert(*id, canister); true } @@ -210,6 +332,7 @@ impl CanisterStates { .collect(); for id in to_cool { let canister = self.hot.remove(&id).unwrap(); + self.cold_stats.add(canister.as_ref()); self.cold.insert(id, canister); } self.debug_assert_invariants(); @@ -255,15 +378,18 @@ impl CanisterStates { } /// Visits every canister (hot and cold) and runs `f` against it, then - /// re-establishes the hot/cold partition. + /// re-establishes strict hot/cold partitioning. /// /// This is the safe way to perform "touch every canister" loops (storage /// charging, checkpoint write-out, …) that may legitimately mutate cold - /// canisters in ways affecting [`CanisterState::is_cold`]. + /// canisters in ways affecting [`CanisterState::is_cold`] *or* any field + /// aggregated in `ColdStats` (`compute_allocation`, `memory_allocation`, + /// canister history size, callbacks). /// /// Iterates the hot pool followed by the cold pool — i.e. canisters are /// **not** yielded in `CanisterId` order — but every canister is visited - /// exactly once. Cost is `O(|hot| + |cold|)`. + /// exactly once. Cost is `O(|hot| + |cold|)`, with a small constant per + /// cold canister for the `cold_stats` bookkeeping. pub fn for_each_mut(&mut self, mut f: F) where F: FnMut(&CanisterId, &mut Arc), @@ -274,15 +400,22 @@ impl CanisterStates { f(id, canister); } - // Cold pool: iterate in place. The closure may flip the canister to - // non-cold, in which case we promote it to `hot` after the loop. + // Cold pool: iterate in place. Because the closure can mutate fields + // that affect `cold_stats` or `is_cold()`, we conservatively subtract + // each canister's pre-call contribution before running `f`, then + // either re-add (if it stays cold) or queue it for promotion to + // `hot` (if `f` flipped it into a non-cold state). let mut to_promote: Vec = Vec::new(); for (id, canister) in self.cold.iter_mut() { + self.cold_stats.sub(canister.as_ref()); f(id, canister); - if !canister.is_cold() { + if canister.is_cold() { + self.cold_stats.add(canister.as_ref()); + } else { to_promote.push(*id); } } + // Promote all canisters that are now hot. for id in to_promote { let canister = self.cold.remove(&id).unwrap(); self.hot.insert(id, canister); @@ -310,13 +443,16 @@ impl CanisterStates { } // Cold pool: same in-place strategy as `for_each_mut`, but short-circuit - // on `Err`. We always preserve the partition for the canister we were - // visiting when the error occurred, then propagate the error. + // on `Err`. We always restore the partition / stats for the canister we + // were visiting when the error occurred, then propagate the error. let mut to_promote: Vec = Vec::new(); if result.is_ok() { for (id, canister) in self.cold.iter_mut() { + self.cold_stats.sub(canister.as_ref()); let res = f(id, canister); - if !canister.is_cold() { + if canister.is_cold() { + self.cold_stats.add(canister.as_ref()); + } else { to_promote.push(*id); } if let Err(e) = res { @@ -325,6 +461,7 @@ impl CanisterStates { } } } + // Promote all canisters that are now hot. for id in to_promote { let canister = self.cold.remove(&id).unwrap(); self.hot.insert(id, canister); @@ -336,7 +473,8 @@ impl CanisterStates { result } - /// Retains only the canisters for which the predicate returns true. + /// Retains only the canisters for which the predicate returns true, updating + /// `cold_stats` to account for any cold canister that was removed. /// /// Iterates both pools. pub fn retain(&mut self, f: F) @@ -344,10 +482,153 @@ impl CanisterStates { F: Fn(&CanisterId, &Arc) -> bool, { self.hot.retain(|id, c| f(id, c)); - self.cold.retain(|id, c| f(id, c)); + self.cold.retain(|id, c| { + if f(id, c) { + true + } else { + self.cold_stats.sub(c.as_ref()); + false + } + }); self.debug_assert_invariants(); } + /// Returns the total reserved compute allocation (as a sum of percentage + /// points) across every canister in either pool. + /// + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + pub fn total_compute_allocation(&self) -> u64 { + let hot: u64 = self + .hot + .values() + .map(|canister| canister.compute_allocation().as_percent()) + .sum(); + hot + self.cold_stats.total_compute_allocation_percent + } + + /// Returns the total number of callbacks (responded and unresponded) + /// registered across every canister in either pool. + /// + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + pub fn callback_count(&self) -> usize { + let hot: usize = self + .hot + .values() + .map(|canister| { + canister + .system_state + .call_context_manager() + .map_or(0, |ccm| ccm.unresponded_callback_count()) + }) + .sum(); + hot + self.cold_stats.callback_count + } + + /// Returns the total memory usage of all canisters in either pool, including + /// message memory. + /// + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + pub fn total_canister_memory_usage(&self) -> NumBytes { + let hot: NumBytes = self + .hot + .values() + .map(|canister| canister.memory_usage() + canister.message_memory_usage().total()) + .sum(); + // Cold canisters contribute their execution memory plus any + // guaranteed-response message memory (reservations). + hot + self.cold_stats.memory_usage + self.cold_stats.guaranteed_response_message_memory + } + + /// Returns the total guaranteed-response message memory (including + /// reservations) across every canister in either pool. Does **not** + /// include subnet queues. + /// + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + pub fn guaranteed_response_message_memory_taken(&self) -> NumBytes { + let hot: NumBytes = self + .hot + .values() + .map(|canister| { + canister + .system_state + .guaranteed_response_message_memory_usage() + }) + .sum(); + hot + self.cold_stats.guaranteed_response_message_memory + } + + /// Returns the total best-effort message memory across every canister in + /// either pool. Does **not** include subnet queues. + /// + /// `O(|hot canisters|)` — cold canisters by definition use no best-effort + /// message memory (see `CanisterState::is_cold`). + pub fn best_effort_message_memory_taken(&self) -> NumBytes { + self.hot + .values() + .map(|canister| canister.system_state.best_effort_message_memory_usage()) + .sum() + } + + /// Computes the per-resource [`MemoryTaken`] aggregate across every + /// canister in either pool. + /// + /// Does **not** include subnet queues, call `ReplicatedState::memory_taken` + /// for the actual subnet-wide memory usage. + /// + /// `O(|hot canisters|)` thanks to the precomputed cold-pool aggregate. + // The only non-test caller of this method is + // `ReplicatedState::memory_taken`, which is wired up in the follow-up + // integration PR. + #[allow(dead_code)] + pub(crate) fn memory_taken(&self) -> MemoryTaken { + let ( + mut execution, + mut guaranteed_response_messages, + best_effort_messages, + mut wasm_custom_sections, + mut canister_history, + ) = self + .hot + .values() + .map(|canister| { + ( + canister.memory_allocated_bytes(), + canister + .system_state + .guaranteed_response_message_memory_usage(), + canister.system_state.best_effort_message_memory_usage(), + canister.wasm_custom_sections_memory_usage(), + canister.canister_history_memory_usage(), + ) + }) + .reduce(|accum, val| { + ( + accum.0 + val.0, + accum.1 + val.1, + accum.2 + val.2, + accum.3 + val.3, + accum.4 + val.4, + ) + }) + .unwrap_or_default(); + + let cold = &self.cold_stats; + execution += cold.execution_memory; + guaranteed_response_messages += cold.guaranteed_response_message_memory; + // `best_effort_messages` has no cold contribution: cold canisters have no + // messages in their queues. + wasm_custom_sections += cold.wasm_custom_sections_memory; + canister_history += cold.canister_history_memory; + + MemoryTaken { + execution, + guaranteed_response_messages, + best_effort_messages, + wasm_custom_sections, + canister_history, + } + } + /// Validates that the current hot/cold partition matches the canonical "strict" /// split that [`CanisterStates::new`] would produce over the same set of /// canisters: @@ -372,10 +653,10 @@ impl CanisterStates { } /// Debug-only consistency check, called at the end of every mutating operation. - /// Verifies invariants (1)–(2) listed under [`CanisterStates`]. + /// Verifies invariants (1)–(3) listed under [`CanisterStates`]. /// /// Also see [`Self::validate_strict_split`] for the stricter validation applied - /// during checkpoint validation. + /// at checkpointing time. fn debug_assert_invariants(&self) { debug_assert!( self.hot.keys().all(|id| !self.cold.contains_key(id)), @@ -385,10 +666,15 @@ impl CanisterStates { self.cold.values().all(|c| c.is_cold()), "cold pool contains a canister that is not cold", ); + debug_assert_eq!( + ColdStats::recompute(self.cold.values()), + self.cold_stats, + "cold_stats is out of sync with the cold pool" + ); } } -/// Iterator returned by [`CanisterStates::all_iter`]. Merge-yields entries from +/// Iterator returned by [`CanisterStates::iter`]. Merge-yields entries from /// the `hot` and `cold` pools in `CanisterId` order. pub struct Iter<'a> { hot: Peekable>>, diff --git a/rs/replicated_state/src/canister_states/tests.rs b/rs/replicated_state/src/canister_states/tests.rs index 7eb5a5bdb72c..1c1c09891af5 100644 --- a/rs/replicated_state/src/canister_states/tests.rs +++ b/rs/replicated_state/src/canister_states/tests.rs @@ -5,8 +5,11 @@ use ic_base_types::NumSeconds; use ic_registry_subnet_type::SubnetType; use ic_test_utilities_types::ids::canister_test_id; use ic_test_utilities_types::messages::RequestBuilder; -use ic_types::messages::RequestOrResponse; -use ic_types_cycles::Cycles; +use ic_types::messages::{CallContextId, NO_DEADLINE, RequestOrResponse}; +use ic_types::methods::{Callback, WasmClosure}; +use ic_types::time::{CoarseTime, UNIX_EPOCH}; +use ic_types::{ComputeAllocation, NumBytes}; +use ic_types_cycles::{CanisterCyclesCostSchedule, CompoundCycles, Cycles}; use std::sync::Arc; fn make_canister(id: u64) -> Arc { @@ -42,6 +45,11 @@ fn push_input(canister: &mut Arc) { .unwrap(); } +fn set_compute_allocation(canister: &mut Arc, percentage: u64) { + Arc::make_mut(canister).system_state.compute_allocation = + ComputeAllocation::try_from(percentage).unwrap(); +} + fn hot_canister(id: u64) -> Arc { let mut canister = make_canister(id); push_input(&mut canister); @@ -52,6 +60,24 @@ fn cold_canister(id: u64) -> Arc { make_canister(id) } +/// Builds a canister that satisfies [`CanisterState::is_cold`] but holds an +/// input-slot reservation for an outstanding guaranteed-response request. Such +/// canisters contribute to `cold_stats.guaranteed_response_message_memory`. +fn cold_canister_with_guaranteed_response_reservation(id: u64) -> Arc { + let mut canister = make_canister(id); + // Push an output request, making an input queue slot reservation. + let request = ic_test_utilities_types::messages::RequestBuilder::default() + .sender(canister.canister_id()) + .receiver(canister_test_id(999)) + .build(); + Arc::make_mut(&mut canister) + .push_output_request(Arc::new(request), UNIX_EPOCH) + .unwrap(); + // Drain the output queue to leave only the reservation behind. + let _ = Arc::make_mut(&mut canister).output_into_iter().count(); + canister +} + #[test] fn fresh_canister_is_cold() { let canister = cold_canister(1); @@ -124,29 +150,35 @@ fn insert_classifies_on_the_fly() { } #[test] -fn insert_replaces_existing_canister() { +fn insert_replaces_existing_canister_and_updates_cold_stats() { let mut states = CanisterStates::default(); - let cold = cold_canister(1); // Initial insert: lands in cold. + let mut cold = cold_canister(1); + set_compute_allocation(&mut cold, 42); assert!(states.insert(Arc::clone(&cold)).is_none()); assert_eq!(states.hot.len(), 0); assert_eq!(states.cold.len(), 1); + assert_eq!(states.cold_stats.total_compute_allocation_percent, 42); // Replace with a hot version of the same canister: returns the cold one, - // partition is updated. - let hot = hot_canister(1); + // `cold_stats` reflects the new (empty) cold pool. + let mut hot = hot_canister(1); + set_compute_allocation(&mut hot, 42); let prev = states.insert(Arc::clone(&hot)).expect("upsert"); assert!(Arc::ptr_eq(&prev, &cold)); assert_eq!(states.hot.len(), 1); assert_eq!(states.cold.len(), 0); + assert_eq!(states.cold_stats.total_compute_allocation_percent, 0); - // Replace again with a cold version: partition flips back. - let cold_again = cold_canister(1); + // Replace again with a cold version: cold_stats picks it back up. + let mut cold_again = cold_canister(1); + set_compute_allocation(&mut cold_again, 42); let prev = states.insert(Arc::clone(&cold_again)).expect("upsert"); assert!(Arc::ptr_eq(&prev, &hot)); assert_eq!(states.hot.len(), 0); assert_eq!(states.cold.len(), 1); + assert_eq!(states.cold_stats.total_compute_allocation_percent, 42); } #[test] @@ -164,7 +196,7 @@ fn get_mut_heats_a_cold_canister() { } #[test] -fn remove_takes_canister_from_either_pool() { +fn remove_updates_cold_stats() { let mut states = CanisterStates::default(); let canister = cold_canister(1); states.insert(Arc::clone(&canister)); @@ -266,8 +298,10 @@ fn for_each_mut_visits_every_canister_and_repartitions() { fn for_each_mut_demotes_a_hot_canister_that_became_cold() { let mut states = CanisterStates::default(); - // c1: hot (has an input message). - let c1 = hot_canister(1); + // c1: hot (has an input message). Tag it with a distinguishable compute + // allocation so that we can verify it lands in `cold_stats` after demotion. + let mut c1 = hot_canister(1); + set_compute_allocation(&mut c1, 42); // c2: already cold, kept untouched as a partition witness. let c2 = cold_canister(2); @@ -276,6 +310,10 @@ fn for_each_mut_demotes_a_hot_canister_that_became_cold() { assert_eq!(states.hot.len(), 1); assert_eq!(states.cold.len(), 1); + // c1 is in hot, so its 42% compute allocation is not yet in `cold_stats`. + assert_eq!(states.cold_stats.total_compute_allocation_percent, 0); + assert_eq!(states.total_compute_allocation(), 42); + // Drain c1's input queue inside the closure: this flips c1 from non-cold // back to cold. `for_each_mut` must demote it as part of its final // `try_cool_all` pass. @@ -290,9 +328,31 @@ fn for_each_mut_demotes_a_hot_canister_that_became_cold() { }); assert_eq!(visited, vec![c1.canister_id(), c2.canister_id()]); - // Both canisters end up in cold. + // Both canisters end up in cold; cold_stats picks up c1's 42% compute. assert_eq!(states.hot.len(), 0); assert_eq!(states.cold.len(), 2); + assert_eq!(states.cold_stats.total_compute_allocation_percent, 42); + assert_eq!(states.total_compute_allocation(), 42); +} + +#[test] +fn for_each_mut_updates_cold_stats_in_place() { + let mut states = CanisterStates::default(); + let c = cold_canister(1); + states.insert(c); + // Sanity: c is cold and contributes 0% compute allocation. + assert_eq!(states.cold.len(), 1); + assert_eq!(states.total_compute_allocation(), 0); + + // Mutate `compute_allocation` on the cold canister without affecting + // `is_cold()`. The in-place iteration must update `cold_stats` accordingly. + states.for_each_mut(|_id, canister| { + set_compute_allocation(canister, 42); + }); + + assert_eq!(states.hot.len(), 0); + assert_eq!(states.cold.len(), 1); + assert_eq!(states.total_compute_allocation(), 42); } #[test] @@ -378,7 +438,7 @@ fn try_for_each_mut_short_circuits_on_cold_error_and_promotes_visited() { Ok(()) } else if *id == c2.canister_id() { // Mutate AND return Err on the same canister: c2's mutation must - // still be reflected in the partition. + // still be reflected in the partition / cold_stats. push_input(canister); Err("boom") } else { @@ -390,25 +450,33 @@ fn try_for_each_mut_short_circuits_on_cold_error_and_promotes_visited() { // Iteration stopped at c2; c3 never visited. assert_eq!(visited, vec![c1.canister_id(), c2.canister_id()]); // c1 and c2 both became non-cold during iteration: both end up in hot. - // c3 remains in cold. + // c3 remains in cold. `debug_assert_invariants` (run inside + // `try_for_each_mut`) covers the matching `cold_stats` invariant. assert!(states.hot.contains_key(&c1.canister_id())); assert!(states.hot.contains_key(&c2.canister_id())); assert!(states.cold.contains_key(&c3.canister_id())); } #[test] -fn retain_drops_canisters_from_both_pools() { +fn retain_updates_cold_stats_for_removed_cold_canisters() { let mut states = CanisterStates::default(); - let c1 = cold_canister(1); - let c2 = cold_canister(2); - let c3 = hot_canister(3); - let c4 = hot_canister(4); + let mut c1 = cold_canister(1); + let mut c2 = cold_canister(2); + let mut c3 = hot_canister(3); + let mut c4 = hot_canister(4); + // Give every canister a distinguishable compute allocation so we can spot + // bookkeeping errors via `total_compute_allocation()`. + for (i, canister) in [&mut c1, &mut c2, &mut c3, &mut c4].into_iter().enumerate() { + set_compute_allocation(canister, 10 + 10 * i as u64); + } states.insert(Arc::clone(&c1)); states.insert(Arc::clone(&c2)); states.insert(Arc::clone(&c3)); states.insert(Arc::clone(&c4)); assert_eq!(states.hot.len(), 2); assert_eq!(states.cold.len(), 2); + // 10% + 20% + 30% + 40% = 100%. + assert_eq!(states.total_compute_allocation(), 100); let keep = [c1.canister_id(), c3.canister_id()]; states.retain(|id, _| keep.contains(id)); @@ -419,6 +487,173 @@ fn retain_drops_canisters_from_both_pools() { assert!(!states.contains_key(&c2.canister_id())); assert!(states.contains_key(&c3.canister_id())); assert!(!states.contains_key(&c4.canister_id())); + // 10% (c1) + 30% (c3) = 40%. + assert_eq!(states.total_compute_allocation(), 40); +} + +#[test] +fn memory_aggregators_combine_hot_and_cold() { + let mut c1 = cold_canister(1); + let mut c2 = hot_canister(2); + // Distinguishable memory allocation reservations so that `execution` + // depends visibly on both canisters. + Arc::make_mut(&mut c1).system_state.memory_allocation = NumBytes::new(10_000_000).into(); + Arc::make_mut(&mut c2).system_state.memory_allocation = NumBytes::new(20_000_000).into(); + + let mut states = CanisterStates::default(); + states.insert(Arc::clone(&c1)); + states.insert(Arc::clone(&c2)); + // Both pools must be exercised, otherwise this test does not cover the + // hot-iteration + cold-aggregate combination. + assert_eq!(states.hot.len(), 1); + assert_eq!(states.cold.len(), 1); + + // Expected values, computed by summing over every canister independently + // of the hot/cold split. + let canisters = [&c1, &c2]; + let expected_memory_usage: NumBytes = canisters + .iter() + .map(|c| c.memory_usage() + c.message_memory_usage().total()) + .sum(); + let expected_guaranteed: NumBytes = canisters + .iter() + .map(|c| c.system_state.guaranteed_response_message_memory_usage()) + .sum(); + let expected_best_effort: NumBytes = canisters + .iter() + .map(|c| c.system_state.best_effort_message_memory_usage()) + .sum(); + let expected_execution: NumBytes = canisters.iter().map(|c| c.memory_allocated_bytes()).sum(); + let expected_wasm_sections: NumBytes = canisters + .iter() + .map(|c| c.wasm_custom_sections_memory_usage()) + .sum(); + let expected_history: NumBytes = canisters + .iter() + .map(|c| c.canister_history_memory_usage()) + .sum(); + + // Sanity: the hot canister contributes non-zero guaranteed-response message + // memory (from `push_input`) and both canisters contribute to `execution` + // (from `memory_allocation`); otherwise a broken aggregator returning 0 could + // trivially pass. + assert!(expected_guaranteed > NumBytes::new(0)); + assert!(expected_execution == NumBytes::new(30_000_000)); + // Also ensure that message memory usage contributes a non-zero amount to + // `total_canister_memory_usage`. + let total_message_memory: NumBytes = canisters + .iter() + .map(|c| c.message_memory_usage().total()) + .sum(); + assert!(total_message_memory > NumBytes::new(0)); + // The actual `total_canister_memory_usage` only includes message memory + // (canisters have no execution state). + assert_eq!(expected_memory_usage, total_message_memory); + + assert_eq!(states.total_canister_memory_usage(), expected_memory_usage); + assert_eq!( + states.guaranteed_response_message_memory_taken(), + expected_guaranteed + ); + assert_eq!( + states.best_effort_message_memory_taken(), + expected_best_effort + ); + + let mt = states.memory_taken(); + assert_eq!(mt.execution, expected_execution); + assert_eq!(mt.guaranteed_response_messages, expected_guaranteed); + assert_eq!(mt.best_effort_messages, expected_best_effort); + assert_eq!(mt.wasm_custom_sections, expected_wasm_sections); + assert_eq!(mt.canister_history, expected_history); +} + +/// A cold canister holding a guaranteed-response slot reservation must still +/// contribute its reservation memory to +/// `guaranteed_response_message_memory_taken` and `memory_taken`. +#[test] +fn cold_canister_with_guaranteed_response_reservation_is_aggregated() { + let canister = cold_canister_with_guaranteed_response_reservation(1); + // Sanity: the test fixture is what it claims to be. + assert!(canister.is_cold()); + let guaranteed_response_message_memory = canister + .system_state + .guaranteed_response_message_memory_usage(); + assert!(guaranteed_response_message_memory > NumBytes::new(0),); + + let mut states = CanisterStates::default(); + states.insert(Arc::clone(&canister)); + assert_eq!(states.hot.len(), 0); + assert_eq!(states.cold.len(), 1); + + // The canister sits in `cold` but contributes a non-zero amount to both + // `guaranteed_response_message_memory_taken` and `memory_taken`. + assert_eq!( + states.guaranteed_response_message_memory_taken(), + guaranteed_response_message_memory, + ); + assert_eq!( + states.memory_taken().guaranteed_response_messages, + guaranteed_response_message_memory, + ); + + // Promoting the canister to `hot` (e.g. via `get_mut`) must not change the + // aggregate. + let _ = states.get_mut(&canister.canister_id()); + assert_eq!(states.hot.len(), 1); + assert_eq!(states.cold.len(), 0); + assert_eq!( + states.guaranteed_response_message_memory_taken(), + guaranteed_response_message_memory, + ); +} + +#[test] +fn callback_count_combines_hot_and_cold() { + fn make_callback(deadline: CoarseTime) -> Callback { + Callback::new( + CallContextId::from(1), + canister_test_id(999), + Cycles::zero(), + CompoundCycles::new(Cycles::zero(), CanisterCyclesCostSchedule::Normal), + CompoundCycles::new(Cycles::zero(), CanisterCyclesCostSchedule::Normal), + CompoundCycles::new(Cycles::zero(), CanisterCyclesCostSchedule::Normal), + WasmClosure::new(0, 0), + WasmClosure::new(0, 0), + None, + deadline, + ) + } + + // A canister with only guaranteed-response (never-expiring) callbacks is + // still cold by definition (see `CanisterState::is_cold`), which lets us + // exercise the `cold_stats.callback_count` aggregate. + let mut cold = cold_canister(1); + Arc::make_mut(&mut cold) + .system_state + .register_callback(make_callback(NO_DEADLINE)) + .unwrap(); + assert!(cold.is_cold()); + + // A hot canister with two callbacks (one guaranteed, one best-effort). + let mut hot = hot_canister(2); + Arc::make_mut(&mut hot) + .system_state + .register_callback(make_callback(NO_DEADLINE)) + .unwrap(); + Arc::make_mut(&mut hot) + .system_state + .register_callback(make_callback(CoarseTime::from_secs_since_unix_epoch(1))) + .unwrap(); + assert!(!hot.is_cold()); + + let mut states = CanisterStates::default(); + states.insert(cold); + states.insert(hot); + + // Total = 1 (cold contribution, via cold_stats) + 2 (hot, via iteration). + assert_eq!(states.cold_stats.callback_count, 1); + assert_eq!(states.callback_count(), 3); } #[test] @@ -439,7 +674,7 @@ fn validate_strict_split_rejects_stale_hot_canister() { states.insert(Arc::clone(&c)); // `get_mut` promotes c to hot without actually mutating anything, so c // ends up cold-by-predicate but in the `hot` pool — exactly the stale - // state that `try_cool_all` is supposed to clean up. + // state that `try_cool_all` / `repartition` are supposed to clean up. let _ = states.get_mut(&c.canister_id()); assert_eq!(states.hot.len(), 1); assert_eq!(states.cold.len(), 0); diff --git a/rs/replicated_state/src/replicated_state.rs b/rs/replicated_state/src/replicated_state.rs index ebee68d7506b..8282410fd9ec 100644 --- a/rs/replicated_state/src/replicated_state.rs +++ b/rs/replicated_state/src/replicated_state.rs @@ -364,15 +364,15 @@ pub struct MemoryTaken { /// specified and the actual canister memory usage (including /// Wasm custom sections) where no explicit memory reservation /// has been made. - execution: NumBytes, + pub(crate) execution: NumBytes, /// Memory taken by guaranteed response canister messages or reservations. - guaranteed_response_messages: NumBytes, + pub(crate) guaranteed_response_messages: NumBytes, /// Memory taken by best-effort canister messages. - best_effort_messages: NumBytes, + pub(crate) best_effort_messages: NumBytes, /// Memory taken by Wasm Custom Sections. - wasm_custom_sections: NumBytes, + pub(crate) wasm_custom_sections: NumBytes, /// Memory taken by canister history. - canister_history: NumBytes, + pub(crate) canister_history: NumBytes, } impl MemoryTaken {