diff --git a/crates/beacon_state/src/tile.rs b/crates/beacon_state/src/tile.rs index 8df90c2..b800c7e 100644 --- a/crates/beacon_state/src/tile.rs +++ b/crates/beacon_state/src/tile.rs @@ -5,7 +5,7 @@ use flux::{ use silver_common::{ BeaconStateEvent, GossipTopic, NewGossipMsg, P2pStreamId, PeerEvent, RejectSource, RpcInbound, RpcMsg, RpcResponse, RpcResponseInbound, RpcSeverity, SilverSpine, SpecConfig, SyncUpdate, - TCacheRead, TRandomAccess, + TRandomAccess, TRead, ssz_view::{ AttesterSlashingView, PROPOSER_SLASHING_SIZE, ProposerSlashingView, SIGNED_BLS_CHANGE_SIZE, SIGNED_VOLUNTARY_EXIT_SIZE, SINGLE_ATT_SIZE, STATUS_V2_SIZE, SignedAggregateAndProofView, @@ -536,7 +536,7 @@ impl BeaconStateTile { fn apply_block( &mut self, data: &[u8], - data_tcache: TCacheRead, + data_tcache: TRead, source: RejectSource, producers: &mut Producers, ) -> Feedback { @@ -560,7 +560,7 @@ impl BeaconStateTile { return f; } - producers.produce(BeaconStateEvent::PersistBlock(data_tcache)); + producers.produce(BeaconStateEvent::PersistBlock(data_tcache.read)); let head_changed = self.last_applied != prev_last_applied; let new_finalized = @@ -762,7 +762,8 @@ impl BeaconStateTile { fn handle_gossip(&mut self, m: NewGossipMsg, data: &[u8], producers: &mut Producers) { let feedback = match m.topic { GossipTopic::BeaconBlock => { - Some(self.apply_block(data, m.ssz, RejectSource::Gossip, producers)) + let acquired = self.gossip_consumer.acquire(m.ssz); + Some(self.apply_block(data, acquired, RejectSource::Gossip, producers)) } GossipTopic::BeaconAttestation(_) => Some(self.handle_attestation(data)), GossipTopic::BeaconAggregateAndProof => Some(self.handle_aggregate_and_proof(data)), @@ -794,7 +795,7 @@ impl BeaconStateTile { msg: RpcMsg, sender: P2pStreamId, data: &[u8], - data_tcache: TCacheRead, + data_tcache: TRead, producers: &mut Producers, ) { if let RpcMsg::BlocksRangeResp(_) = msg { @@ -1581,7 +1582,7 @@ impl Tile for BeaconStateTile { } }); - adapter.consume(|m: RpcInbound, producers| { + adapter.consume_one(|m: RpcInbound, producers| { if let RpcInbound::Response(RpcResponseInbound { application_id: _, stream_id, @@ -1597,7 +1598,7 @@ impl Tile for BeaconStateTile { RpcMsg::BlocksRangeResp(SignedBeaconBlockView), stream_id, unsafe { &*p }, - ssz, + acquired, producers, ); } diff --git a/crates/common/src/config/mod.rs b/crates/common/src/config/mod.rs index f89a86d..5e9e5bc 100644 --- a/crates/common/src/config/mod.rs +++ b/crates/common/src/config/mod.rs @@ -72,7 +72,7 @@ pub struct Config { outgoing_gossip_tcache_size: usize, #[serde(default = "default_usize::<33554432>")] // 2 << 24 incoming_gossip_ssz_tcache_size: usize, - #[serde(default = "default_usize::<33554432>")] // 2 << 24 + #[serde(default = "default_usize::<67108864>")] // 2 << 25 incoming_rpc_tcache_size: usize, #[serde(default = "default_usize::<33554432>")] // 2 << 24 outgoing_rpc_tcache_size: usize, @@ -114,7 +114,7 @@ impl Config { incoming_gossip_tcache_size: 2 << 24, // protobuf outgoing_gossip_tcache_size: 2 << 24, // protobuf incoming_gossip_ssz_tcache_size: 2 << 24, // ssz - incoming_rpc_tcache_size: 2 << 24, // ssz + incoming_rpc_tcache_size: 2 << 25, // ssz outgoing_rpc_tcache_size: 2 << 24, // ssz } } diff --git a/crates/common/src/metrics.rs b/crates/common/src/metrics.rs index 6e44b72..102e763 100644 --- a/crates/common/src/metrics.rs +++ b/crates/common/src/metrics.rs @@ -73,27 +73,16 @@ impl Drop for TimerGuard { } /// Open / create the counters file, ftruncate to `bytes`, mmap shared, -/// and publish the base pointer to `target`. No-op if `target` is -/// already non-null (idempotent across repeat `init()` calls). -/// -/// Counter files land in flux's standard shmem-queues directory — -/// `{base_dir}/{app_name}/shmem/queues/counters-{file_name}` — so a -/// single observer-side scan picks up both flux queues and counter -/// files in one walk. -/// -/// Called by macro-generated `init()` functions; not intended for -/// direct use. -pub fn map_counters( +/// and return the base pointer. Counter files land in flux's standard +/// shmem-queues directory — +/// `{base_dir}/{app_name}/shmem/queues/counters-{file_name}` — so a single +/// observer-side scan picks up both flux queues and counter files in one walk. +pub fn mmap_counters_file( base_dir: &Path, app_name: &str, file_name: &str, bytes: usize, - target: &AtomicPtr, -) -> io::Result<()> { - if !target.load(Ordering::Relaxed).is_null() { - return Ok(()); - } - +) -> io::Result<*mut AtomicU64> { let dir = flux::utils::directories::shmem_dir_queues_with_base(base_dir, app_name); std::fs::create_dir_all(&dir)?; let path = dir.join(format!("counters-{file_name}")); @@ -116,7 +105,24 @@ pub fn map_counters( return Err(io::Error::last_os_error()); } - target.store(ptr.cast::(), Ordering::Release); + Ok(ptr.cast::()) +} + +/// As `mmap_counters_file` but publishes the result into a +/// `AtomicPtr` target — used by the static-pointer pattern emitted by +/// `declare_counters!`. No-op if `target` is already non-null. +pub fn map_counters( + base_dir: &Path, + app_name: &str, + file_name: &str, + bytes: usize, + target: &AtomicPtr, +) -> io::Result<()> { + if !target.load(Ordering::Relaxed).is_null() { + return Ok(()); + } + let ptr = mmap_counters_file(base_dir, app_name, file_name, bytes)?; + target.store(ptr, Ordering::Release); Ok(()) } diff --git a/crates/common/src/spine/tcache.rs b/crates/common/src/spine/tcache.rs index 781220b..4bf839d 100644 --- a/crates/common/src/spine/tcache.rs +++ b/crates/common/src/spine/tcache.rs @@ -9,7 +9,7 @@ use std::{ }; pub use consumer::{AcquiredRead, Consumer, RandomAccessConsumer, TCacheRead}; -use flux::timing::Nanos; +use flux::{timing::Nanos, tracing}; pub use producer::{MultiProducer, Producer, Reservation, TCacheProducer}; use thiserror::Error; @@ -20,8 +20,11 @@ const MAX_CONSUMERS: usize = 64; const ALIGN: usize = size_of::(); mod consumer; +mod metrics; mod producer; +use metrics::TCacheMetrics; + /// Single or multi producer, multi consumer cache buffer with a Tail /// /// _ /)---(\ @@ -39,6 +42,9 @@ pub struct TCache { head: TCacheHead, len: u32, data: Box<[u8]>, + /// `None` when name is empty, or if mmap'ing the metrics file + /// fails (tile continues to function without surfer visibility). + metrics: Option, } #[derive(Copy, Clone, Debug)] @@ -107,6 +113,10 @@ impl TCache { }) .ok_or(Error::MaxConsumers)?; + // Publish a baseline tail so surfer's chart has a value to plot + // before the consumer has actually called free. + self.record_tail(index, seq); + Ok(Consumer { cache: TCacheRef { cache: addr_of!(*self) as *const c_void }, index, @@ -128,6 +138,8 @@ impl TCache { }) .ok_or(Error::MaxConsumers)?; + self.record_tail(index, seq); + Ok(RandomAccessConsumer { cache: TCacheRef { cache: addr_of!(*self) as *const c_void }, index, @@ -286,7 +298,13 @@ impl TCache { if success { slot.skip = 0; } + let new_head = seq + slot.reservation_len as u64; slot.seq = AtomicU64::new(seq); + // Track the producer's actual progress for the metrics layer. + // `head.seq` (visible to joining consumers) is only updated by + // `publish_head` on out-of-space, but surfer wants the live + // production cursor. + self.record_head(new_head); } fn alloc_tache(name: &'static str, size: usize) -> Box { @@ -295,6 +313,15 @@ impl TCache { "n is not mutiple of {ALIGN}" ); let layout = Layout::from_size_align(size, ALIGN).unwrap(); + let metrics = if name.is_empty() { + None + } else { + // MAX_CONSUMERS — overcount is fine for the file size; surfer + // ignores never-set tails (they stay at u64::MAX sentinel). + TCacheMetrics::new(name, MAX_CONSUMERS, size as u64) + .map_err(|e| tracing::warn!(?name, ?e, "TCacheMetrics::new failed")) + .ok() + }; unsafe { let ptr = alloc::alloc_zeroed(layout); if ptr.is_null() { @@ -309,9 +336,31 @@ impl TCache { }, len: data.len() as u32, data, + metrics, }) } } + + #[inline] + pub(super) fn record_head(&self, seq: u64) { + if let Some(m) = &self.metrics { + m.set_head_seq(seq); + } + } + + /// Record a consumer's tail seq into the per-tcache metrics slot. + /// A seq of 0 is interpreted as "no real progress" and is mapped + /// to the `u64::MAX` sentinel — surfer then treats it as + /// "tail == head" rather than dragging `min_tail` to 0. The + /// in-memory `head.tails[idx]` is unaffected; this is purely the + /// metrics view. + #[inline] + pub(super) fn record_tail(&self, idx: usize, seq: u64) { + if let Some(m) = &self.metrics { + let v = if seq == 0 { u64::MAX } else { seq }; + m.set_tail_seq(idx, v); + } + } } #[repr(C)] diff --git a/crates/common/src/spine/tcache/consumer.rs b/crates/common/src/spine/tcache/consumer.rs index f337436..cac19e1 100644 --- a/crates/common/src/spine/tcache/consumer.rs +++ b/crates/common/src/spine/tcache/consumer.rs @@ -78,6 +78,7 @@ impl Consumer { //tracing::warn!("consumer free: {}", self.seq); self.seq = self.next_seq; self.cache.head.tails[self.index].store(self.seq, Ordering::Release); + self.cache.record_tail(self.index, self.seq); } } @@ -103,7 +104,12 @@ impl RandomAccessConsumer { /// visible to the Producer. pub fn free(&self) { let tail = self.active.tail_seq; - self.cache.head.tails[self.index].store(tail, Ordering::Release); + + if tail != u64::MAX { + //tracing::warn!("Random consumer free {tail}"); + self.cache.head.tails[self.index].store(tail, Ordering::Release); + self.cache.record_tail(self.index, tail); + } } fn release(&mut self, seq: u64) { @@ -168,6 +174,7 @@ pub(super) struct Buckets { head_seq: u64, bucket_size: u64, bucket_shift: u64, + bucket_mask: u64, // max difference between head and tail, before 'forced' eviction lag_threshold: u64, } @@ -188,56 +195,55 @@ impl Buckets { head_seq: 0, bucket_size, bucket_shift: bucket_size.trailing_zeros() as u64, + bucket_mask: !(bucket_size - 1), lag_threshold: (LAG_PERCENT * (cache_capacity as f64)) as u64, } } fn acquire(&mut self, seq: u64) { - let bucket_idx = self.index(seq); + let bucket_idx = self.bucket_index(seq); self.buckets[bucket_idx] += 1; self.head_seq = self.head_seq.max(seq); if self.tail_seq == u64::MAX { - self.tail_seq = seq & !(self.bucket_size - 1); + self.tail_seq = self.bucket_start_seq(seq); } - // check lagging - if self.head_seq - self.tail_seq > self.lag_threshold { - let mut tail_idx = self.index(self.tail_seq); - self.buckets[tail_idx] = 0; - - while self.buckets[tail_idx] == 0 && (self.tail_seq + self.bucket_size) < self.head_seq - { - tail_idx = (tail_idx + 1) & (self.buckets.len() - 1); - self.tail_seq += self.bucket_size; + // rollup tail for completed buckets + let head_bucket_seq = self.bucket_start_seq(seq); + while head_bucket_seq > self.tail_seq { + let tail_bucket = self.bucket_index(self.tail_seq); + if self.head_seq - self.tail_seq > self.lag_threshold { + tracing::warn!("unfreed lagging consumers dropped!"); + self.buckets[tail_bucket] = 0; } + if self.buckets[tail_bucket] != 0 { + break; + } + self.tail_seq += self.bucket_size; } } fn release(&mut self, seq: u64) { if seq < self.tail_seq { + tracing::warn!("tried to release: {seq} which is < {}", self.tail_seq); return; } - let mut bucket_idx = self.index(seq); + let bucket_idx = self.bucket_index(seq); self.buckets[bucket_idx] = self.buckets[bucket_idx].saturating_sub(1); - - let mut bucket_tail_seq = seq & !(self.bucket_size - 1); - if bucket_tail_seq == self.tail_seq { - while self.buckets[bucket_idx] == 0 && - (bucket_tail_seq + self.bucket_size) < self.head_seq - { - bucket_idx = (bucket_idx + 1) & (self.buckets.len() - 1); - bucket_tail_seq += self.bucket_size; - } - self.tail_seq = bucket_tail_seq; - } } - fn index(&self, seq: u64) -> usize { + #[inline] + fn bucket_index(&self, seq: u64) -> usize { ((seq >> self.bucket_shift) as usize) & (self.buckets.len() - 1) } + + #[inline] + fn bucket_start_seq(&self, seq: u64) -> u64 { + seq & self.bucket_mask + } } #[cfg(test)] @@ -268,6 +274,7 @@ mod tests { b.acquire(200); // bucket 3, head = 200 assert_eq!(b.tail_seq, 0); b.release(0); // bucket 0 empties; head far enough ahead to advance + b.acquire(300); assert!(b.tail_seq > 0, "tail did not advance: {}", b.tail_seq); assert!(b.tail_seq <= 200); } @@ -283,6 +290,7 @@ mod tests { assert_eq!(b.tail_seq, 0, "tail moved while bucket 0 still held"); // Release the head; tail jumps past bucket 0 and bucket 1 (both empty) b.release(0); + b.acquire(350); assert!(b.tail_seq >= 128, "tail did not jump: {}", b.tail_seq); } diff --git a/crates/common/src/spine/tcache/metrics.rs b/crates/common/src/spine/tcache/metrics.rs new file mode 100644 index 0000000..d96f81c --- /dev/null +++ b/crates/common/src/spine/tcache/metrics.rs @@ -0,0 +1,83 @@ +//! Per-TCache shmem-backed counter set. +//! +//! Layout in `counters-tcache-{name}` (each slot is a `u64`): +//! 0: capacity (bytes) — set once at construction. +//! 1: head_seq — updated by `Producer::publish_head`. +//! 2..2+N: tail_seq[i] — updated by consumer `i`'s `free`. +//! +//! Surfer renders the file as a ring-buffer occupancy bar +//! (head − min_tail / capacity) plus current length, decoded +//! dynamically from the file's slot count. + +use std::{ + io, + sync::atomic::{AtomicU64, Ordering}, +}; + +use crate::metrics::mmap_counters_file; + +/// Number of fixed (non-tail) slots in the layout. +const FIXED_SLOTS: usize = 2; +/// Index of the capacity slot. +const CAPACITY_IDX: usize = 0; +/// Index of the head_seq slot. +const HEAD_IDX: usize = 1; + +pub struct TCacheMetrics { + base: *mut AtomicU64, + n_consumers: usize, + map_bytes: usize, +} + +// SAFETY: `base` points at an `mmap(MAP_SHARED)` region; every access +// goes through atomic ops. The mmap is alive for the lifetime of the +// `TCacheMetrics` value (Drop calls munmap). +unsafe impl Send for TCacheMetrics {} +unsafe impl Sync for TCacheMetrics {} + +impl TCacheMetrics { + /// mmap the `counters-tcache-{name}` file under + /// `flux::utils::directories::local_share_dir() / "silver" / shmem / + /// queues`, pre-populate the capacity slot. + pub fn new(name: &str, n_consumers: usize, capacity: u64) -> io::Result { + let slots = FIXED_SLOTS + n_consumers; + let map_bytes = slots * std::mem::size_of::(); + let file_name = format!("tcache-{name}"); + let base_dir = flux::utils::directories::local_share_dir(); + let base = mmap_counters_file(&base_dir, "silver", &file_name, map_bytes)?; + // SAFETY: ptr is valid for `slots` AtomicU64s. + unsafe { + (*base.add(CAPACITY_IDX)).store(capacity, Ordering::Relaxed); + // Tails start at u64::MAX sentinel — matches the TCache's + // own `head.tails` initialisation. Surfer treats this as + // "tail == head" (no consumer for this slot). + for i in 0..n_consumers { + (*base.add(FIXED_SLOTS + i)).store(u64::MAX, Ordering::Relaxed); + } + } + Ok(Self { base, n_consumers, map_bytes }) + } + + #[inline] + pub fn set_head_seq(&self, v: u64) { + // SAFETY: HEAD_IDX is < slot count by construction. + unsafe { (*self.base.add(HEAD_IDX)).store(v, Ordering::Relaxed) }; + } + + #[inline] + pub fn set_tail_seq(&self, consumer_idx: usize, v: u64) { + debug_assert!(consumer_idx < self.n_consumers); + // SAFETY: consumer_idx bounded by `n_consumers` (debug-asserted). + unsafe { + (*self.base.add(FIXED_SLOTS + consumer_idx)).store(v, Ordering::Relaxed); + } + } +} + +impl Drop for TCacheMetrics { + fn drop(&mut self) { + // SAFETY: `base` was returned by mmap with `map_bytes`; no other + // references into this region outlive `self`. + unsafe { libc::munmap(self.base.cast::(), self.map_bytes) }; + } +} diff --git a/crates/common/src/spine/tcache/producer.rs b/crates/common/src/spine/tcache/producer.rs index d976073..bd5d939 100644 --- a/crates/common/src/spine/tcache/producer.rs +++ b/crates/common/src/spine/tcache/producer.rs @@ -11,7 +11,9 @@ pub trait TCacheProducer: SealedProducer { /// Publish the head sequence for joining consumers. fn publish_head(&self) { let tcache = unsafe { &*self.tcache() }; - tcache.head.seq.store(self.seq(), Ordering::Release); + let seq = self.seq(); + tcache.head.seq.store(seq, Ordering::Release); + tcache.record_head(seq); } fn cache_ref(&self) -> TCacheRef { diff --git a/crates/surfer/src/app.rs b/crates/surfer/src/app.rs index c599f8f..b0ae0f4 100644 --- a/crates/surfer/src/app.rs +++ b/crates/surfer/src/app.rs @@ -1,10 +1,16 @@ +use std::collections::HashSet; + use ratatui::widgets::TableState; -use crate::sources::{counters::CounterSet, tilemetrics::TileMetricsSet, timings::TimingSet}; +use crate::{ + discovery::DiscoveredSources, + sources::{counters::CounterSet, tilemetrics::TileMetricsSet, timings::TimingSet}, +}; #[derive(Clone, Copy, PartialEq, Eq)] pub enum Pane { Counters, + TCaches, Timings, Tiles, } @@ -13,6 +19,7 @@ impl Pane { pub fn label(self) -> &'static str { match self { Pane::Counters => "Counters", + Pane::TCaches => "TCaches", Pane::Timings => "Timings", Pane::Tiles => "Tiles", } @@ -20,7 +27,8 @@ impl Pane { pub fn next(self) -> Self { match self { - Pane::Counters => Pane::Timings, + Pane::Counters => Pane::TCaches, + Pane::TCaches => Pane::Timings, Pane::Timings => Pane::Tiles, Pane::Tiles => Pane::Counters, } @@ -32,6 +40,8 @@ pub struct App { pub counters: Vec, /// Currently selected (counter_set_idx, slot_idx) inside the counters pane. pub counters_selection: (usize, usize), + pub tcaches: Vec, + pub tcaches_selection: usize, pub timings: Vec, /// Selected timing-set index inside the timings pane. pub timings_selection: usize, @@ -50,6 +60,7 @@ pub struct App { /// pane's selection each frame and ratatui keeps the offset in /// view. pub counters_table_state: TableState, + pub tcaches_table_state: TableState, pub timings_table_state: TableState, pub tiles_table_state: TableState, pub quit: bool, @@ -63,6 +74,7 @@ const SPLIT_STEP: i32 = 5; impl App { pub fn new( counters: Vec, + tcaches: Vec, timings: Vec, tilemetrics: Vec, ) -> Self { @@ -70,6 +82,8 @@ impl App { pane: Pane::Counters, counters, counters_selection: (0, 0), + tcaches, + tcaches_selection: 0, timings, timings_selection: 0, tilemetrics, @@ -77,6 +91,7 @@ impl App { drilled_in: false, split_pct: SPLIT_DEFAULT, counters_table_state: TableState::default(), + tcaches_table_state: TableState::default(), timings_table_state: TableState::default(), tiles_table_state: TableState::default(), quit: false, @@ -106,10 +121,87 @@ impl App { self.split_pct = new as u16; } + /// Absorb any sources that appeared since the last discovery. + /// Insertion-only — existing handles keep their mmaps and history + /// rings. Selections are restored by name across the sort so a + /// newly-inserted source doesn't shift the user's highlight. + pub fn merge_new_sources(&mut self, sources: DiscoveredSources) { + // Counters. + let sel_name = self.counters.get(self.counters_selection.0).map(|c| c.name.clone()); + let existing: HashSet = self.counters.iter().map(|c| c.name.clone()).collect(); + for f in &sources.counters { + if !existing.contains(&f.name) { + if let Ok(c) = CounterSet::open(f) { + self.counters.push(c); + } + } + } + self.counters.sort_by(|a, b| a.name.cmp(&b.name)); + if let Some(n) = sel_name { + if let Some(idx) = self.counters.iter().position(|c| c.name == n) { + self.counters_selection.0 = idx; + } + } + + // TCaches. + let sel_name = self.tcaches.get(self.tcaches_selection).map(|c| c.name.clone()); + let existing: HashSet = self.tcaches.iter().map(|c| c.name.clone()).collect(); + for f in &sources.tcaches { + if !existing.contains(&f.name) { + if let Ok(c) = CounterSet::open(f) { + self.tcaches.push(c); + } + } + } + self.tcaches.sort_by(|a, b| a.name.cmp(&b.name)); + if let Some(n) = sel_name { + if let Some(idx) = self.tcaches.iter().position(|c| c.name == n) { + self.tcaches_selection = idx; + } + } + + // Timings. + let sel_name = self.timings.get(self.timings_selection).map(|t| t.name.clone()); + let existing: HashSet = self.timings.iter().map(|t| t.name.clone()).collect(); + for f in &sources.timings { + if !existing.contains(&f.name) { + if let Ok(t) = TimingSet::open(f) { + self.timings.push(t); + } + } + } + self.timings.sort_by(|a, b| a.name.cmp(&b.name)); + if let Some(n) = sel_name { + if let Some(idx) = self.timings.iter().position(|t| t.name == n) { + self.timings_selection = idx; + } + } + + // Tile metrics. + let sel_name = self.tilemetrics.get(self.tiles_selection).map(|t| t.name.clone()); + let existing: HashSet = self.tilemetrics.iter().map(|t| t.name.clone()).collect(); + for f in &sources.tilemetrics { + if !existing.contains(&f.name) { + if let Ok(t) = TileMetricsSet::open(f) { + self.tilemetrics.push(t); + } + } + } + self.tilemetrics.sort_by(|a, b| a.name.cmp(&b.name)); + if let Some(n) = sel_name { + if let Some(idx) = self.tilemetrics.iter().position(|t| t.name == n) { + self.tiles_selection = idx; + } + } + } + pub fn sample(&mut self) { for c in &mut self.counters { c.sample(); } + for c in &mut self.tcaches { + c.sample(); + } for t in &mut self.timings { t.drain(); } @@ -122,6 +214,12 @@ impl App { for c in &mut self.counters { c.roll_bucket(); } + for c in &mut self.tcaches { + // For tcaches the per-slot delta IS the metric — head/tail + // values are monotonically-increasing seq cursors so a 1 s + // delta gives bytes/sec produced or consumed. + c.roll_bucket(); + } for t in &mut self.timings { t.roll_bucket(); } @@ -133,11 +231,21 @@ impl App { pub fn move_selection(&mut self, dir: i32) { match self.pane { Pane::Counters => self.move_counter_selection(dir), + Pane::TCaches => self.move_tcache_selection(dir), Pane::Timings => self.move_timing_selection(dir), Pane::Tiles => self.move_tile_selection(dir), } } + fn move_tcache_selection(&mut self, dir: i32) { + if self.tcaches.is_empty() { + return; + } + let n = self.tcaches.len() as i32; + let new = (self.tcaches_selection as i32 + dir).rem_euclid(n); + self.tcaches_selection = new as usize; + } + fn move_tile_selection(&mut self, dir: i32) { if self.tilemetrics.is_empty() { return; diff --git a/crates/surfer/src/discovery.rs b/crates/surfer/src/discovery.rs index f69c9b0..ee9867c 100644 --- a/crates/surfer/src/discovery.rs +++ b/crates/surfer/src/discovery.rs @@ -12,6 +12,7 @@ use std::{fs, io, path::PathBuf}; pub struct DiscoveredSources { pub counters: Vec, + pub tcaches: Vec, pub timings: Vec, pub tilemetrics: Vec, } @@ -39,6 +40,7 @@ pub struct TileMetricsFile { pub fn discover(base_dir: &std::path::Path, app_name: &str) -> io::Result { let mut counters = Vec::new(); + let mut tcaches = Vec::new(); let mut timings = Vec::new(); let mut tilemetrics = Vec::new(); @@ -49,7 +51,12 @@ pub fn discover(base_dir: &std::path::Path, app_name: &str) -> io::Result io::Result io::Result<()> { let mut args = std::env::args().skip(1); @@ -57,6 +60,21 @@ fn main() -> io::Result<()> { c.sample(); } + let mut tcache_sets: Vec = sources + .tcaches + .iter() + .filter_map(|f| match CounterSet::open(f) { + Ok(c) => Some(c), + Err(e) => { + eprintln!("surfer: skipping {}: {e}", f.path.display()); + None + } + }) + .collect(); + for c in &mut tcache_sets { + c.sample(); + } + let mut timing_sets: Vec = sources .timings .iter() @@ -86,7 +104,7 @@ fn main() -> io::Result<()> { for t in &mut tile_sets { t.drain(); } - let mut app = App::new(counter_sets, timing_sets, tile_sets); + let mut app = App::new(counter_sets, tcache_sets, timing_sets, tile_sets); enable_raw_mode()?; let mut stdout = io::stdout(); @@ -94,7 +112,7 @@ fn main() -> io::Result<()> { let backend = CrosstermBackend::new(stdout); let mut term = Terminal::new(backend)?; - let result = run(&mut term, &mut app); + let result = run(&mut term, &mut app, &base_dir, &app_name); disable_raw_mode()?; execute!(term.backend_mut(), LeaveAlternateScreen)?; @@ -103,9 +121,15 @@ fn main() -> io::Result<()> { result } -fn run(term: &mut Terminal, app: &mut App) -> io::Result<()> { +fn run( + term: &mut Terminal, + app: &mut App, + base_dir: &std::path::Path, + app_name: &str, +) -> io::Result<()> { let mut last_tick = Instant::now(); let mut last_bucket = Instant::now(); + let mut last_discover = Instant::now(); loop { term.draw(|f| render::draw(f, app))?; @@ -125,6 +149,12 @@ fn run(term: &mut Terminal, app: &mut App) -> i app.roll_bucket(); last_bucket = Instant::now(); } + if last_discover.elapsed() >= DISCOVER { + if let Ok(s) = discovery::discover(base_dir, app_name) { + app.merge_new_sources(s); + } + last_discover = Instant::now(); + } if app.quit { return Ok(()); } diff --git a/crates/surfer/src/render/mod.rs b/crates/surfer/src/render/mod.rs index 6b8302e..4099aaa 100644 --- a/crates/surfer/src/render/mod.rs +++ b/crates/surfer/src/render/mod.rs @@ -1,5 +1,6 @@ pub mod counters_pane; pub mod fmt; +pub mod tcaches_pane; pub mod tiles_pane; pub mod timings_pane; @@ -22,6 +23,7 @@ pub fn draw(f: &mut Frame, app: &mut App) { draw_header(f, chunks[0], app); match app.pane { Pane::Counters => counters_pane::draw(f, chunks[1], app), + Pane::TCaches => tcaches_pane::draw(f, chunks[1], app), Pane::Timings => timings_pane::draw(f, chunks[1], app), Pane::Tiles => tiles_pane::draw(f, chunks[1], app), } @@ -29,7 +31,7 @@ pub fn draw(f: &mut Frame, app: &mut App) { } fn draw_header(f: &mut Frame, area: Rect, app: &App) { - let spans: Vec = [Pane::Counters, Pane::Timings, Pane::Tiles] + let spans: Vec = [Pane::Counters, Pane::TCaches, Pane::Timings, Pane::Tiles] .iter() .flat_map(|&p| { let style = if p == app.pane { diff --git a/crates/surfer/src/render/tcaches_pane.rs b/crates/surfer/src/render/tcaches_pane.rs new file mode 100644 index 0000000..db648d0 --- /dev/null +++ b/crates/surfer/src/render/tcaches_pane.rs @@ -0,0 +1,330 @@ +//! TCaches pane — one row per TCache instance. Visual is a horizontal +//! ring-buffer bar showing the head/min-tail occupancy, followed by +//! human-readable capacity + current length columns. +//! +//! Slot layout per CounterSet (synthesised by `schema::names_for`): +//! 0: capacity +//! 1: head_seq +//! 2..: tail_seq[i] — tails never updated by their consumer remain +//! at the `u64::MAX` sentinel and are ignored. + +use ratatui::{ + Frame, + layout::{Constraint, Direction, Layout, Rect}, + style::{Color, Modifier, Style}, + symbols, + text::{Line, Span}, + widgets::{Axis, Block, Borders, Cell, Chart, Dataset, GraphType, Paragraph, Row, Table}, +}; + +use crate::{app::App, render::fmt::fmt_span_ago, sources::counters::CounterSet}; + +pub fn draw(f: &mut Frame, area: Rect, app: &mut App) { + if app.tcaches.is_empty() { + let block = Block::default().borders(Borders::ALL).title(" tcaches "); + let inner = block.inner(area); + f.render_widget(block, area); + f.render_widget( + Paragraph::new("no tcache counters discovered") + .style(Style::default().fg(Color::DarkGray)), + inner, + ); + return; + } + + if app.drilled_in { + draw_chart(f, area, app); + return; + } + let rows = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Percentage(app.split_pct), + Constraint::Percentage(100 - app.split_pct), + ]) + .split(area); + draw_table(f, rows[0], app); + draw_chart(f, rows[1], app); +} + +fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { + let block = Block::default().borders(Borders::ALL).title(" tcaches "); + + let header = Row::new(vec![ + Cell::from("name"), + Cell::from("occupancy"), + Cell::from("head"), + Cell::from("min_tail"), + Cell::from("capacity"), + Cell::from("length"), + ]) + .style(Style::default().add_modifier(Modifier::BOLD).fg(Color::White)) + .height(1); + + // Bar is column 1; width is what's left after the other columns. + // ratatui doesn't expose computed column widths from the Table API, + // so we hard-derive: area.width − name_pct(20%) − caps + spacing. + let inner_w = area.width.saturating_sub(2) as usize; // borders + let name_w = (inner_w * 20).div_ceil(100); + let head_w = 12usize; + let tail_w = 12usize; + let cap_w = 12usize; + let len_w = 12usize; + let bar_w = inner_w.saturating_sub(name_w + head_w + tail_w + cap_w + len_w + 5); // 5 cell gaps + + let rows: Vec = app + .tcaches + .iter() + .map(|set| { + let view = TCacheView::from(set); + let display_name = view.display_name(); + + let bar_line = bar_line(view.capacity, view.head_seq, view.min_tail_seq, bar_w); + + Row::new(vec![ + Cell::from(display_name), + Cell::from(bar_line), + Cell::from(Span::raw(format!("{:>10}", fmt_bytes(view.head_seq)))), + Cell::from(Span::raw(format!("{:>10}", fmt_bytes(view.min_tail_seq)))), + Cell::from(Span::raw(format!("{:>10}", fmt_bytes(view.capacity)))), + Cell::from(Span::raw(format!("{:>10}", fmt_bytes(view.length())))), + ]) + .height(1) + }) + .collect(); + + let widths = [ + Constraint::Percentage(20), + Constraint::Min(10), + Constraint::Length(head_w as u16), + Constraint::Length(tail_w as u16), + Constraint::Length(cap_w as u16), + Constraint::Length(len_w as u16), + ]; + let table = Table::new(rows, widths).header(header).block(block); + app.tcaches_table_state.select(Some(app.tcaches_selection)); + f.render_stateful_widget(table, area, &mut app.tcaches_table_state); +} + +/// Per-line palette for tail consumers; head is always Cyan. +const TAIL_COLORS: &[Color] = &[ + Color::Magenta, + Color::Yellow, + Color::Green, + Color::Red, + Color::LightBlue, + Color::LightMagenta, + Color::LightYellow, + Color::LightGreen, +]; + +struct ChartSeries { + name: String, + color: Color, + data: Vec<(f64, f64)>, +} + +fn draw_chart(f: &mut Frame, area: Rect, app: &App) { + let Some(set) = app.tcaches.get(app.tcaches_selection) else { + f.render_widget(Block::default().borders(Borders::ALL).title(" history "), area); + return; + }; + let view = TCacheView::from(set); + + // Collect head + every active tail. Slot 1 = head_seq, + // slot 2..N = tail_seq[i]. Slot 0 (capacity) is constant so its + // delta is always zero — skip. + let mut series: Vec = Vec::new(); + if let Some(hist) = set.history.get(1) { + let data: Vec<(f64, f64)> = + hist.iter().enumerate().map(|(i, &v)| (i as f64, v as f64)).collect(); + series.push(ChartSeries { name: "head".to_string(), color: Color::Cyan, data }); + } + for slot_idx in 2..set.current.len() { + // u64::MAX = unused slot (no consumer registered). Real + // consumers write their tail value, which can be 0 if + // registered before any production. + let current = set.current.get(slot_idx).copied().unwrap_or(u64::MAX); + if current == u64::MAX { + continue; + } + let Some(hist) = set.history.get(slot_idx) else { continue }; + if hist.is_empty() { + continue; + } + let consumer_idx = slot_idx - 2; + let color = TAIL_COLORS[consumer_idx % TAIL_COLORS.len()]; + let data: Vec<(f64, f64)> = + hist.iter().enumerate().map(|(i, &v)| (i as f64, v as f64)).collect(); + series.push(ChartSeries { name: format!("tail_{consumer_idx}"), color, data }); + } + + let title = format!(" {} — 1s deltas (bytes/s) ", view.display_name()); + let block = Block::default().borders(Borders::ALL).title(title); + + let any_data = series.iter().any(|s| !s.data.is_empty()); + if !any_data { + f.render_widget(block, area); + return; + } + + let y_max = + series.iter().flat_map(|s| s.data.iter().map(|(_, y)| *y)).fold(0.0f64, f64::max).max(1.0); + let n = series.iter().map(|s| s.data.len()).max().unwrap_or(0); + let x_max = n.saturating_sub(1).max(1) as f64; + + let datasets: Vec = series + .iter() + .map(|s| { + Dataset::default() + .name(s.name.clone()) + .marker(symbols::Marker::Braille) + .style(Style::default().fg(s.color)) + .graph_type(GraphType::Line) + .data(&s.data) + }) + .collect(); + + let x_labels = vec![ + Line::from(format!("-{}", fmt_span_ago(n))), + Line::from(format!("-{}", fmt_span_ago(n / 2))), + Line::from("now"), + ]; + let y_labels = vec![ + Line::from("0"), + Line::from(format!("{}/s", fmt_bytes((y_max / 2.0).round() as u64))), + Line::from(format!("{}/s", fmt_bytes(y_max.round() as u64))), + ]; + + let chart = Chart::new(datasets) + .block(block) + .x_axis( + Axis::default() + .bounds([0.0, x_max]) + .labels(x_labels) + .style(Style::default().fg(Color::DarkGray)), + ) + .y_axis( + Axis::default() + .bounds([0.0, y_max * 1.1]) + .labels(y_labels) + .style(Style::default().fg(Color::DarkGray)), + ); + f.render_widget(chart, area); +} + +/// Decoded view of a tcache counter file. +struct TCacheView<'a> { + name: &'a str, + capacity: u64, + head_seq: u64, + /// Minimum of every non-sentinel tail. If all tails are still at + /// the `u64::MAX` sentinel (no consumer ever called free yet), + /// falls back to `head_seq` so length renders as 0. + min_tail_seq: u64, +} + +impl<'a> TCacheView<'a> { + fn from(set: &'a CounterSet) -> Self { + let capacity = set.current.first().copied().unwrap_or(0); + let head_seq = set.current.get(1).copied().unwrap_or(0); + // Distinguish "no real consumer" from "consumer caught up": + // - At least one real (non-sentinel) tail → use the minimum. Sentinel slots are + // unused; treat them as `head_seq` so they don't drag the minimum. + // - All tails are sentinel → no consumer has ever published progress on this + // TCache. Show the ring contents as the producer sees them: from `head - + // capacity` up to `head` (clamped at 0 for pre-wrap producers). + let any_real = set.current.iter().skip(2).any(|&t| t != u64::MAX); + let min_tail_seq = if any_real { + set.current + .iter() + .skip(2) + .copied() + .map(|t| if t == u64::MAX { head_seq } else { t }) + .min() + .unwrap_or(head_seq) + } else { + head_seq.saturating_sub(capacity) + }; + Self { name: &set.name, capacity, head_seq, min_tail_seq } + } + + fn display_name(&self) -> String { + self.name.strip_prefix("tcache-").unwrap_or(self.name).to_string() + } + + fn length(&self) -> u64 { + self.head_seq.saturating_sub(self.min_tail_seq) + } +} + +/// Build a horizontal bar showing the ring occupancy as a styled +/// `Line`. Filled section is `[min_tail_pos, head_pos)` wrapping when +/// `head < tail`. Width is `width` chars; `[`/`]` brackets sit outside +/// the count. +fn bar_line(capacity: u64, head_seq: u64, min_tail_seq: u64, width: usize) -> Line<'static> { + if width < 3 || capacity == 0 { + return Line::from(""); + } + let inner = width.saturating_sub(2); + // Modular positions inside the ring. + let mask = capacity - 1; + let head_pos = head_seq & mask; + let tail_pos = min_tail_seq & mask; + + // Map a byte offset → bar character index. + // tail_c uses floor (the byte sits inside char `tail_c`), head_c + // uses ceil so any non-zero backlog always spans at least one + // visible char even when head_pos and tail_pos round to the same + // floor. + let to_char_floor = + |bytes: u64| -> usize { ((bytes as u128 * inner as u128) / capacity as u128) as usize }; + let to_char_ceil = |bytes: u64| -> usize { + ((bytes as u128 * inner as u128).div_ceil(capacity as u128)) as usize + }; + let head_c = to_char_ceil(head_pos); + let tail_c = to_char_floor(tail_pos); + let length = head_seq.saturating_sub(min_tail_seq); + let full = length >= capacity; + + let bar: String = (0..inner) + .map(|i| { + if full { + '█' + } else if length == 0 { + // Empty queue (head == min_tail) — render a vertical + // marker at the head/tail position so the user can + // still see where the cursor sits. + if i == head_c { '│' } else { '░' } + } else if head_c >= tail_c { + if i >= tail_c && i < head_c { '█' } else { '░' } + } else { + // Wrapped. + if i >= tail_c || i < head_c { '█' } else { '░' } + } + }) + .collect(); + + Line::from(vec![ + Span::raw("["), + Span::styled(bar, Style::default().fg(Color::Cyan)), + Span::raw("]"), + ]) +} + +fn fmt_bytes(bytes: u64) -> String { + const KB: u64 = 1 << 10; + const MB: u64 = 1 << 20; + const GB: u64 = 1 << 30; + if bytes == 0 { + "0B".to_string() + } else if bytes >= GB { + format!("{:.2}GB", bytes as f64 / GB as f64) + } else if bytes >= MB { + format!("{:.2}MB", bytes as f64 / MB as f64) + } else if bytes >= KB { + format!("{:.2}KB", bytes as f64 / KB as f64) + } else { + format!("{bytes}B") + } +} diff --git a/crates/surfer/src/schema.rs b/crates/surfer/src/schema.rs index 7698f63..f21bd4f 100644 --- a/crates/surfer/src/schema.rs +++ b/crates/surfer/src/schema.rs @@ -21,5 +21,13 @@ pub fn names_for(file_name: &str, slot_count: usize) -> (Vec, bool) { if let Some(arr) = lookup(file_name) { return (arr.iter().map(|s| s.to_string()).collect(), true); } + // TCache files have a fixed semantic layout decoded from slot count. + if file_name.starts_with("tcache-") && slot_count >= 2 { + let mut names = vec!["capacity".to_string(), "head_seq".to_string()]; + for i in 0..(slot_count - 2) { + names.push(format!("tail_{i}")); + } + return (names, true); + } ((0..slot_count).map(|i| format!("slot_{i}")).collect(), false) } diff --git a/crates/surfer/src/sources/counters.rs b/crates/surfer/src/sources/counters.rs index 8109977..8742dd5 100644 --- a/crates/surfer/src/sources/counters.rs +++ b/crates/surfer/src/sources/counters.rs @@ -42,6 +42,11 @@ pub struct CounterSet { bucket_start: Vec, /// Per-slot ring of completed-bucket deltas (newest at back). pub history: Vec>, + /// `false` until the first `sample()` call. The first sample + /// primes `previous` and `bucket_start` so initial deltas start + /// at 0 rather than a wraparound (matters for slots initialised + /// to non-zero sentinels like `u64::MAX`). + primed: bool, } // SAFETY: `base` points at an mmap'd shmem file; the underlying memory @@ -70,6 +75,7 @@ impl CounterSet { previous: vec![0; slot_count], bucket_start: vec![0; slot_count], history: (0..slot_count).map(|_| VecDeque::with_capacity(BUCKET_HISTORY_LEN)).collect(), + primed: false, }) } @@ -82,19 +88,39 @@ impl CounterSet { for i in 0..self.slot_count { self.current[i] = unsafe { (*self.base.add(i)).load(Ordering::Relaxed) }; } + if !self.primed { + // Prime previous/bucket_start to the first observed values + // so deltas start at 0. Slots initialised to non-zero + // sentinels (e.g. tcache tails = u64::MAX) would otherwise + // produce a wraparound delta on the first roll. + self.previous.copy_from_slice(&self.current); + self.bucket_start.copy_from_slice(&self.current); + self.primed = true; + } } pub fn slot_count(&self) -> usize { self.slot_count } - /// Close the current 12 s bucket: for each slot, compute + /// Close the current bucket: for each slot, compute /// `current - bucket_start` and push to the per-slot history ring /// (drop oldest when full). Then snapshot `current` into /// `bucket_start` so the next bucket starts accumulating from now. + /// + /// Sentinel handling: when either endpoint of the delta is + /// `u64::MAX` (TCache tail "unused-slot" sentinel; not a real + /// metric value anywhere else), the delta is recorded as 0. This + /// suppresses garbage spikes when a slot transitions to/from + /// sentinel state — common at startup if the mmap file was reused + /// from a previous run. pub fn roll_bucket(&mut self) { for i in 0..self.slot_count { - let delta = self.current[i].wrapping_sub(self.bucket_start[i]); + let delta = if self.current[i] == u64::MAX || self.bucket_start[i] == u64::MAX { + 0 + } else { + self.current[i].wrapping_sub(self.bucket_start[i]) + }; let h = &mut self.history[i]; if h.len() == BUCKET_HISTORY_LEN { h.pop_front();