diff --git a/crates/beacon_state/src/tile.rs b/crates/beacon_state/src/tile.rs index 1ce22a8..412ab0e 100644 --- a/crates/beacon_state/src/tile.rs +++ b/crates/beacon_state/src/tile.rs @@ -1710,8 +1710,8 @@ mod tests { let ticker = SlotTicker::new(genesis, Duration::from_secs(12), Duration::from_secs(4)); let gossip_p = TCache::producer("test_gossip", 1 << 20); let event_p = TCache::producer("test_event", 1 << 20); - let gossip_c = gossip_p.cache_ref().random_access(true).unwrap(); - let rpc_c = event_p.cache_ref().random_access(true).unwrap(); + let gossip_c = gossip_p.cache_ref().random_access("test_gossip", true).unwrap(); + let rpc_c = event_p.cache_ref().random_access("test_event", true).unwrap(); BeaconStateTile::new_heap(ticker, SpecConfig::mainnet(), gossip_c, rpc_c, &[]) } diff --git a/crates/beacon_state/tests/common.rs b/crates/beacon_state/tests/common.rs index 433fd6b..d06d438 100644 --- a/crates/beacon_state/tests/common.rs +++ b/crates/beacon_state/tests/common.rs @@ -154,8 +154,8 @@ impl Harness { let gossip_in_producer = TCache::producer("gossip_in", 1 << 24); let rpc_in_producer = TCache::producer("rpc_in", 1 << 24); let gossip_consumer = - gossip_in_producer.cache_ref().random_access(true).expect("gossip ra"); - let rpc_consumer = rpc_in_producer.cache_ref().random_access(true).expect("rpc ra"); + gossip_in_producer.cache_ref().random_access("test", true).expect("gossip ra"); + let rpc_consumer = rpc_in_producer.cache_ref().random_access("test", true).expect("rpc ra"); let tile = build_tile(ticker, gossip_consumer, rpc_consumer); diff --git a/crates/bin/src/main.rs b/crates/bin/src/main.rs index 87c5f6e..a9461f2 100644 --- a/crates/bin/src/main.rs +++ b/crates/bin/src/main.rs @@ -50,14 +50,17 @@ fn main() -> Result<(), Box> { // TCaches let incoming_gossip_producer = TCache::producer("incoming_gossip", config.incoming_gossip_tcache_size()); - let incoming_gossip_consumer = incoming_gossip_producer.cache_ref().consumer()?; + let incoming_gossip_consumer = + incoming_gossip_producer.cache_ref().consumer("incoming_gossip")?; let ssz_gossip_producer = TCache::producer("ssz_gossip", config.incoming_gossip_ssz_tcache_size()); - let ssz_gossip_consumer = ssz_gossip_producer.cache_ref().random_access(true)?; + let ssz_gossip_consumer = + ssz_gossip_producer.cache_ref().random_access("bs_ssz_gossip", true)?; let outgoing_gossip_producer = TCache::producer("outgoing_gossip", config.outgoing_gossip_tcache_size()); let incoming_rpc_producer = TCache::producer("incoming_rpc", config.incoming_rpc_tcache_size()); - let incoming_rpc_consumer = incoming_rpc_producer.cache_ref().random_access(true)?; + let incoming_rpc_consumer = + incoming_rpc_producer.cache_ref().random_access("bs_incoming_rpc", true)?; // rpc producer let outgoing_rpc_producer = TCache::producer("outgoing_rpc", config.outgoing_rpc_tcache_size()); @@ -86,9 +89,11 @@ fn main() -> Result<(), Box> { ); let p2p_context = Context { gossip_producer: incoming_gossip_producer, - gossip_consumer: outgoing_gossip_producer.cache_ref().random_access(true)?, + gossip_consumer: outgoing_gossip_producer + .cache_ref() + .random_access("p2p_outgoing_gossip", true)?, rpc_producer: incoming_rpc_producer, - rpc_consumer: outgoing_rpc_producer.cache_ref().random_access(true)?, + rpc_consumer: outgoing_rpc_producer.cache_ref().random_access("p2p_outgoing_rpc", true)?, identify: Some(ProtoIdentify::from((&config.identify()?, &keypair))), }; diff --git a/crates/common/src/spine/tcache.rs b/crates/common/src/spine/tcache.rs index 4bf839d..0b146f1 100644 --- a/crates/common/src/spine/tcache.rs +++ b/crates/common/src/spine/tcache.rs @@ -100,7 +100,7 @@ impl TCache { self.name } - pub fn consumer(&self) -> Result { + pub fn consumer(&self, name: &'static str) -> Result { // find start seq let seq = self.head.seq.load(Ordering::Acquire); @@ -116,16 +116,22 @@ impl TCache { // Publish a baseline tail so surfer's chart has a value to plot // before the consumer has actually called free. self.record_tail(index, seq); + self.record_consumer_name(index, name); Ok(Consumer { cache: TCacheRef { cache: addr_of!(*self) as *const c_void }, index, seq, next_seq: seq, + timer: self.create_consumer_timer(name), }) } - pub fn random_access(&self, auto_free: bool) -> Result { + pub fn random_access( + &self, + name: &'static str, + auto_free: bool, + ) -> Result { // find start seq let seq = self.head.seq.load(Ordering::Acquire); @@ -139,15 +145,40 @@ impl TCache { .ok_or(Error::MaxConsumers)?; self.record_tail(index, seq); + self.record_consumer_name(index, name); Ok(RandomAccessConsumer { cache: TCacheRef { cache: addr_of!(*self) as *const c_void }, index, active: Buckets::new(32 * 1024, self.len as u64), auto_free, + timer: self.create_consumer_timer(name), }) } + #[inline] + fn record_consumer_name(&self, idx: usize, name: &'static str) { + if let Some(m) = &self.metrics { + m.set_consumer_name(idx, name); + } + } + + /// Build a per-consumer flux Timer named + /// `tcache-{tcache_name}-{consumer_name}`. Result file paths are + /// `timing-tcache-{tcache_name}-{consumer_name}` and + /// `latency-tcache-{tcache_name}-{consumer_name}` under flux's + /// `shmem/queues` directory — surfer picks up the latency queue + /// via the existing timings discovery. + /// + /// Returns `None` when this TCache has no metrics (unnamed or + /// metrics file open failed) so the consumer's read path is a + /// no-op for latency. + fn create_consumer_timer(&self, consumer_name: &'static str) -> Option { + self.metrics.as_ref()?; + let label = format!("tcache-{}-{}", self.name, consumer_name); + Some(flux::Timer::new("silver", &label)) + } + fn index(&self, seq: u64) -> usize { (seq & (self.len - 1) as u64) as usize } @@ -623,7 +654,7 @@ mod tests { let mut producer = TCache::producer("test_tcache", TCACHE_SIZE); let mut consumers: Vec = - (0..CONSUMERS).map(|_| producer.cache_ref().consumer().unwrap()).collect(); + (0..CONSUMERS).map(|_| producer.cache_ref().consumer("test").unwrap()).collect(); let done = Arc::new(AtomicBool::new(false)); @@ -677,7 +708,7 @@ mod tests { let mp = TCache::multi_producer("test_mp", TCACHE_SIZE); let mut consumers: Vec = - (0..CONSUMERS).map(|_| mp.cache_ref().consumer().unwrap()).collect(); + (0..CONSUMERS).map(|_| mp.cache_ref().consumer("test").unwrap()).collect(); // Spawn consumers BEFORE producers so they observe the full stream // from seq 0 and don't miss early commits. @@ -739,7 +770,7 @@ mod tests { #[test] fn produce_consume() { let mut producer = TCache::producer("test_buckets", 2 << 14); - let mut consumer = producer.cache_ref().consumer().unwrap(); + let mut consumer = producer.cache_ref().consumer("test").unwrap(); let prod = std::thread::spawn(move || { let mut rng = rand::thread_rng(); diff --git a/crates/common/src/spine/tcache/consumer.rs b/crates/common/src/spine/tcache/consumer.rs index cac19e1..d2d7c94 100644 --- a/crates/common/src/spine/tcache/consumer.rs +++ b/crates/common/src/spine/tcache/consumer.rs @@ -1,6 +1,6 @@ use std::{ops::Deref, sync::atomic::Ordering}; -use flux::timing::Nanos; +use flux::{Timer, timing::Nanos}; use crate::{GossipMsgOut, TCacheError, TCacheRef}; @@ -54,6 +54,11 @@ pub struct Consumer { pub(super) index: usize, pub(super) seq: u64, pub(super) next_seq: u64, + /// Per-consumer flux Timer emitting `latency-tcache-{tcache}-{name}` + /// — measures elapsed from `slot.reserve_ns` to first read. + /// `None` if the TCache wasn't named or the Timer queue couldn't + /// be opened. + pub(super) timer: Option, } impl Consumer { @@ -66,6 +71,9 @@ impl Consumer { })?; if !data.is_empty() { + if let Some(timer) = &mut self.timer { + timer.emit_latency_from_nanos(ts, Nanos::now()); + } return Ok((data, ts)); } self.seq = self.next_seq; @@ -83,7 +91,7 @@ impl Consumer { } /// Consumer that supports random access to messages between its tail and buffer -/// head. Tail is tracked externally. +/// head. Tail is tracked externally. pub struct RandomAccessConsumer { pub(super) cache: TCacheRef, pub(super) index: usize, @@ -92,11 +100,19 @@ pub struct RandomAccessConsumer { // If set `free` is called on drop of every acquired read, ensuring the // consumer tail is published. pub(super) auto_free: bool, + /// Per-consumer flux Timer emitting `latency-tcache-{tcache}-{name}` + /// — measures elapsed from `slot.reserve_ns` at acquire time. + pub(super) timer: Option, } impl RandomAccessConsumer { pub fn acquire(&mut self, read: TCacheRead) -> AcquiredRead { self.active.acquire(read.seq); + if let Some(timer) = &mut self.timer { + if let Ok(reserve_ns) = read.cache_ts() { + timer.emit_latency_from_nanos(reserve_ns, Nanos::now()); + } + } AcquiredRead { consumer: self as *const Self, read } } @@ -337,7 +353,7 @@ mod tests { #[test] fn acquire_release_cycle_reads_buffer() { let mut producer = TCache::producer("test_consumer", 1 << 16); - let mut consumer = producer.cache_ref().random_access(false).unwrap(); + let mut consumer = producer.cache_ref().random_access("test", false).unwrap(); producer.publish_head(); let reads: Vec = @@ -367,7 +383,7 @@ mod tests { const TOTAL: usize = 1000; let mut producer = TCache::producer("test_consumer", CACHE); - let mut consumer = producer.cache_ref().random_access(false).unwrap(); + let mut consumer = producer.cache_ref().random_access("test", false).unwrap(); producer.publish_head(); let mut held: Vec = Vec::new(); @@ -403,7 +419,7 @@ mod tests { const MSG_LEN: usize = 8 * 1024; let mut producer = TCache::producer("test_consumer", CACHE); - let mut consumer = producer.cache_ref().random_access(false).unwrap(); + let mut consumer = producer.cache_ref().random_access("test", false).unwrap(); producer.publish_head(); // The "victim" — held across heavy downstream production. diff --git a/crates/common/src/spine/tcache/metrics.rs b/crates/common/src/spine/tcache/metrics.rs index d96f81c..4e19a95 100644 --- a/crates/common/src/spine/tcache/metrics.rs +++ b/crates/common/src/spine/tcache/metrics.rs @@ -5,12 +5,16 @@ //! 1: head_seq — updated by `Producer::publish_head`. //! 2..2+N: tail_seq[i] — updated by consumer `i`'s `free`. //! -//! Surfer renders the file as a ring-buffer occupancy bar -//! (head − min_tail / capacity) plus current length, decoded -//! dynamically from the file's slot count. +//! Companion file `tcache-names-{name}` stores per-consumer names +//! as fixed-width zero-padded UTF-8 buffers (`MAX_CONSUMERS × NAME_LEN` +//! bytes). Each consumer writes its name at registration time so +//! surfer can label rows by consumer identity instead of positional +//! `tail_{i}`. use std::{ + fs::OpenOptions, io, + os::fd::AsRawFd, sync::atomic::{AtomicU64, Ordering}, }; @@ -22,11 +26,19 @@ const FIXED_SLOTS: usize = 2; const CAPACITY_IDX: usize = 0; /// Index of the head_seq slot. const HEAD_IDX: usize = 1; +/// Bytes per consumer-name slot. UTF-8 zero-padded; tail bytes zero. +/// 32 fits typical labels like `data_columns_gossip` with headroom. +pub const NAME_LEN: usize = 32; pub struct TCacheMetrics { base: *mut AtomicU64, n_consumers: usize, map_bytes: usize, + /// Companion mmap for consumer names. `n_consumers * NAME_LEN` + /// bytes. `None` if the names file couldn't be opened (e.g. + /// permissions); name writes become no-ops. + names_base: *mut u8, + names_bytes: usize, } // SAFETY: `base` points at an `mmap(MAP_SHARED)` region; every access @@ -36,15 +48,17 @@ unsafe impl Send for TCacheMetrics {} unsafe impl Sync for TCacheMetrics {} impl TCacheMetrics { - /// mmap the `counters-tcache-{name}` file under + /// mmap the `counters-tcache-{name}` file and companion + /// `tcache-names-{name}` file under /// `flux::utils::directories::local_share_dir() / "silver" / shmem / - /// queues`, pre-populate the capacity slot. + /// queues`. Pre-populates the capacity slot, fills tail slots with the + /// `u64::MAX` sentinel, and zeroes the names buffer. pub fn new(name: &str, n_consumers: usize, capacity: u64) -> io::Result { let slots = FIXED_SLOTS + n_consumers; let map_bytes = slots * std::mem::size_of::(); - let file_name = format!("tcache-{name}"); + let counters_file = format!("tcache-{name}"); let base_dir = flux::utils::directories::local_share_dir(); - let base = mmap_counters_file(&base_dir, "silver", &file_name, map_bytes)?; + let base = mmap_counters_file(&base_dir, "silver", &counters_file, map_bytes)?; // SAFETY: ptr is valid for `slots` AtomicU64s. unsafe { (*base.add(CAPACITY_IDX)).store(capacity, Ordering::Relaxed); @@ -55,7 +69,15 @@ impl TCacheMetrics { (*base.add(FIXED_SLOTS + i)).store(u64::MAX, Ordering::Relaxed); } } - Ok(Self { base, n_consumers, map_bytes }) + + let names_bytes = n_consumers * NAME_LEN; + let names_file = format!("tcache-names-{name}"); + let names_base = mmap_bytes_file(&base_dir, "silver", &names_file, names_bytes)?; + // SAFETY: pointer is valid for `names_bytes`. Zero on init so + // unset slots have empty names. + unsafe { std::ptr::write_bytes(names_base, 0, names_bytes) }; + + Ok(Self { base, n_consumers, map_bytes, names_base, names_bytes }) } #[inline] @@ -72,12 +94,65 @@ impl TCacheMetrics { (*self.base.add(FIXED_SLOTS + consumer_idx)).store(v, Ordering::Relaxed); } } + + /// Write a consumer's name into its slot. Truncated to `NAME_LEN` + /// bytes. Trailing bytes are zero-padded so readers can find the + /// terminator. + pub fn set_consumer_name(&self, idx: usize, name: &str) { + debug_assert!(idx < self.n_consumers); + if idx >= self.n_consumers { + return; + } + let bytes = name.as_bytes(); + let len = bytes.len().min(NAME_LEN); + // SAFETY: idx bounded by `n_consumers`; `idx * NAME_LEN + NAME_LEN` + // is within `names_bytes`. + unsafe { + let dst = self.names_base.add(idx * NAME_LEN); + std::ptr::write_bytes(dst, 0, NAME_LEN); + std::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, len); + } + } } impl Drop for TCacheMetrics { fn drop(&mut self) { - // SAFETY: `base` was returned by mmap with `map_bytes`; no other - // references into this region outlive `self`. - unsafe { libc::munmap(self.base.cast::(), self.map_bytes) }; + // SAFETY: both pointers came from `mmap` with the recorded sizes; + // no other references into either region outlive `self`. + unsafe { + libc::munmap(self.base.cast::(), self.map_bytes); + libc::munmap(self.names_base.cast::(), self.names_bytes); + } + } +} + +/// Open/create + mmap a raw bytes shmem file under flux's +/// `shmem/queues` directory. Layered alongside `mmap_counters_file` +/// for non-u64 layouts. +fn mmap_bytes_file( + base_dir: &std::path::Path, + app_name: &str, + file_name: &str, + bytes: usize, +) -> io::Result<*mut u8> { + let dir = flux::utils::directories::shmem_dir_queues_with_base(base_dir, app_name); + std::fs::create_dir_all(&dir)?; + let path = dir.join(file_name); + let file = + OpenOptions::new().read(true).write(true).create(true).truncate(false).open(&path)?; + file.set_len(bytes as u64)?; + let ptr = unsafe { + libc::mmap( + std::ptr::null_mut(), + bytes, + libc::PROT_READ | libc::PROT_WRITE, + libc::MAP_SHARED, + file.as_raw_fd(), + 0, + ) + }; + if ptr == libc::MAP_FAILED { + return Err(io::Error::last_os_error()); } + Ok(ptr.cast::()) } diff --git a/crates/e2e/src/stack.rs b/crates/e2e/src/stack.rs index dff2f54..6642376 100644 --- a/crates/e2e/src/stack.rs +++ b/crates/e2e/src/stack.rs @@ -201,29 +201,31 @@ impl PublisherStack { // TCaches needed by the network tile on the publisher side. // gossip_in: network writes raw inbound gossip here; nobody reads. let gossip_in_producer = TCache::producer("e2e_stack", TCACHE_SIZE); - let gossip_in_consumer = gossip_in_producer.cache_ref().consumer().ok(); + let gossip_in_consumer = gossip_in_producer.cache_ref().consumer("e2e").ok(); // gossip_out: network reads outbound bytes from here via random-access. // The publisher's mcache TCache IS the gossip_out source — same cache. let mcache_producer = TCache::producer("e2e_stack", TCACHE_SIZE); - let gossip_out_ra = mcache_producer.cache_ref().random_access(true).expect("random_access"); + let gossip_out_ra = + mcache_producer.cache_ref().random_access("e2e", true).expect("random_access"); // rpc_in: network writes inbound RPC payload bytes here; tests // (multipart-RPC) read via `rpc_in_ra`. Regular consumer also // attached for keep-alive plus future controller use. let rpc_in_producer = TCache::producer("e2e_stack", TCACHE_SIZE); - let rpc_in_consumer = rpc_in_producer.cache_ref().consumer().ok(); + let rpc_in_consumer = rpc_in_producer.cache_ref().consumer("e2e").ok(); let rpc_in_ra = - rpc_in_producer.cache_ref().random_access(true).expect("rpc_in random_access"); + rpc_in_producer.cache_ref().random_access("e2e", true).expect("rpc_in random_access"); // rpc_out: tests reserve here to inject outbound BeaconBlock // chunks; the network tile reads via the random-access handle // wired into `Context.rpc_consumer`. let rpc_out_producer = TCache::producer("e2e_stack", TCACHE_SIZE); - let rpc_out_ra = rpc_out_producer.cache_ref().random_access(true).expect("random_access"); + let rpc_out_ra = + rpc_out_producer.cache_ref().random_access("e2e", true).expect("random_access"); // gossip_out handle given to network. let gossip_out_ra_for_network = - mcache_producer.cache_ref().random_access(true).expect("random_access"); + mcache_producer.cache_ref().random_access("e2e", true).expect("random_access"); let context = Context { gossip_producer: gossip_in_producer, @@ -306,24 +308,25 @@ impl EchoStack { // Inbound gossip raw bytes: network writes, compression consumes. let gossip_in_producer = TCache::producer("e2e_stack", TCACHE_SIZE); - let gossip_in_consumer = gossip_in_producer.cache_ref().consumer().expect("consumer"); + let gossip_in_consumer = gossip_in_producer.cache_ref().consumer("e2e").expect("consumer"); // SSZ output: compression writes, stats-sink reads. let ssz_producer = TCache::producer("e2e_stack", TCACHE_SIZE); - let ssz_consumer = ssz_producer.cache_ref().random_access(true).expect("consumer"); + let ssz_consumer = ssz_producer.cache_ref().random_access("e2e", true).expect("consumer"); // Protobuf mcache: compression writes; network reads via random_access // when re-forwarding. Not exercised in one-way test but wiring must // exist. let protobuf_producer = TCache::producer("e2e_stack", TCACHE_SIZE); let protobuf_ra_for_network = - protobuf_producer.cache_ref().random_access(true).expect("random_access"); + protobuf_producer.cache_ref().random_access("e2e", true).expect("random_access"); // RPC caches: dummy. let rpc_in_producer = TCache::producer("e2e_stack", TCACHE_SIZE); - let rpc_in_consumer = rpc_in_producer.cache_ref().consumer().ok(); + let rpc_in_consumer = rpc_in_producer.cache_ref().consumer("e2e").ok(); let rpc_out_producer = TCache::producer("e2e_stack", TCACHE_SIZE); - let rpc_out_ra = rpc_out_producer.cache_ref().random_access(true).expect("random_access"); + let rpc_out_ra = + rpc_out_producer.cache_ref().random_access("e2e", true).expect("random_access"); let context = Context { gossip_producer: gossip_in_producer, diff --git a/crates/e2e/tests/checkpoint_load.rs b/crates/e2e/tests/checkpoint_load.rs index d1d924c..5ad7632 100644 --- a/crates/e2e/tests/checkpoint_load.rs +++ b/crates/e2e/tests/checkpoint_load.rs @@ -47,8 +47,8 @@ fn finalized_state_loads() { let ticker = SlotTicker::new(genesis_time, Duration::from_secs(12), Duration::from_secs(4)); let gossip_p = TCache::producer("gossip_in", 1 << 20); let rpc_p = TCache::producer("rpc_in", 1 << 20); - let gossip_c = gossip_p.cache_ref().random_access(false).unwrap(); - let rpc_c = rpc_p.cache_ref().random_access(false).unwrap(); + let gossip_c = gossip_p.cache_ref().random_access("test", false).unwrap(); + let rpc_c = rpc_p.cache_ref().random_access("test", false).unwrap(); let mut tile = BeaconStateTile::new_heap( ticker, diff --git a/crates/e2e/tests/sync_pm_bs.rs b/crates/e2e/tests/sync_pm_bs.rs index db6b45d..1bf06ad 100644 --- a/crates/e2e/tests/sync_pm_bs.rs +++ b/crates/e2e/tests/sync_pm_bs.rs @@ -150,8 +150,8 @@ fn pm_drives_two_batches_against_real_checkpoint() { let gossip_p = TCache::producer("gossip_in", 1 << 20); let mut rpc_p = TCache::producer("rpc_in", 1 << 22); // largest mainnet block ~250KB × 4 - let gossip_c = gossip_p.cache_ref().random_access(true).expect("gossip ra"); - let rpc_c = rpc_p.cache_ref().random_access(true).expect("rpc ra"); + let gossip_c = gossip_p.cache_ref().random_access("test", true).expect("gossip ra"); + let rpc_c = rpc_p.cache_ref().random_access("test", true).expect("rpc ra"); let mut bs = BeaconStateTile::new_heap( ticker, diff --git a/crates/e2e/tests/sync_pm_bs_one_batch.rs b/crates/e2e/tests/sync_pm_bs_one_batch.rs index ac9c425..49e5087 100644 --- a/crates/e2e/tests/sync_pm_bs_one_batch.rs +++ b/crates/e2e/tests/sync_pm_bs_one_batch.rs @@ -161,8 +161,8 @@ fn pm_drives_single_big_batch_against_real_checkpoint() { // tcache-internal accounting. let rpc_cap_bytes = ((n_blocks as usize) * 300 * 1024).next_power_of_two(); let mut rpc_p = TCache::producer("rpc_in", rpc_cap_bytes); - let gossip_c = gossip_p.cache_ref().random_access(true).expect("gossip ra"); - let rpc_c = rpc_p.cache_ref().random_access(true).expect("rpc ra"); + let gossip_c = gossip_p.cache_ref().random_access("test", true).expect("gossip ra"); + let rpc_c = rpc_p.cache_ref().random_access("test", true).expect("rpc ra"); let mut bs = BeaconStateTile::new_heap( ticker, diff --git a/crates/gossip/src/control.rs b/crates/gossip/src/control.rs index 084f7cb..345c496 100644 --- a/crates/gossip/src/control.rs +++ b/crates/gossip/src/control.rs @@ -454,7 +454,7 @@ mod tests { use crate::generated::RPCView; fn read_bytes(tc: TCacheRead, producer: &silver_common::TProducer) -> Vec { - let mut consumer = producer.cache_ref().random_access(false).unwrap(); + let mut consumer = producer.cache_ref().random_access("test", false).unwrap(); let read = consumer.acquire(tc); let (bytes, _) = read.buffer().unwrap(); bytes.to_vec() diff --git a/crates/gossip/src/mcache.rs b/crates/gossip/src/mcache.rs index 1ac280b..fb057ce 100644 --- a/crates/gossip/src/mcache.rs +++ b/crates/gossip/src/mcache.rs @@ -171,7 +171,7 @@ mod tests { fn mk_mcache() -> (MessageCache, silver_common::TProducer) { let producer = TCache::producer("mcache_test", 1 << 14); - let consumer = producer.cache_ref().random_access(false).unwrap(); + let consumer = producer.cache_ref().random_access("test", false).unwrap(); (MessageCache::new(consumer), producer) } diff --git a/crates/gossip/src/tile.rs b/crates/gossip/src/tile.rs index 50bcdfe..d1425d3 100644 --- a/crates/gossip/src/tile.rs +++ b/crates/gossip/src/tile.rs @@ -49,7 +49,8 @@ impl GossipHandler { protobuf_gossip_publish: TProducer, fork_digest_hex: String, ) -> Result { - let mcache_consumer = protobuf_gossip_publish.cache_ref().random_access(false)?; + let mcache_consumer = + protobuf_gossip_publish.cache_ref().random_access("gossip_mcache", false)?; let mcache = MessageCache::new(mcache_consumer); Ok(Self { diff --git a/crates/network/benches/quic_basic.rs b/crates/network/benches/quic_basic.rs index c252990..e85d133 100644 --- a/crates/network/benches/quic_basic.rs +++ b/crates/network/benches/quic_basic.rs @@ -40,11 +40,12 @@ pub fn broadcast(c: &mut Criterion) { x.iter_batched( || { let gi_producer = TCache::producer("bench_gi", 2 << 24); - let mut gi_consumer = gi_producer.cache_ref().consumer().unwrap(); + let mut gi_consumer = gi_producer.cache_ref().consumer("bench").unwrap(); let go_producer = TCache::producer("bench_small", 32); - let go_consumer = go_producer.cache_ref().random_access(true).unwrap(); + let go_consumer = + go_producer.cache_ref().random_access("bench", true).unwrap(); let rpc_in = TCache::producer("bench_small", 32); - let rpc_out = rpc_in.cache_ref().random_access(true).unwrap(); + let rpc_out = rpc_in.cache_ref().random_access("bench", true).unwrap(); let (mut server_tile, server_id) = { let secret = secp256k1::SecretKey::new(&mut rng); @@ -114,9 +115,10 @@ pub fn broadcast(c: &mut Criterion) { let gi_producer = TCache::producer("bench_small", 32); let mut go_producer = TCache::producer("bench_go", 2 << 28); - let go_consumer = go_producer.cache_ref().random_access(true).unwrap(); + let go_consumer = + go_producer.cache_ref().random_access("bench", true).unwrap(); let rpc_in = TCache::producer("bench_small", 32); - let rpc_out = rpc_in.cache_ref().random_access(false).unwrap(); + let rpc_out = rpc_in.cache_ref().random_access("bench", false).unwrap(); let context = Context { gossip_producer: gi_producer, diff --git a/crates/network/benches/quic_pingpong.rs b/crates/network/benches/quic_pingpong.rs index 43896fd..4f0ddc1 100644 --- a/crates/network/benches/quic_pingpong.rs +++ b/crates/network/benches/quic_pingpong.rs @@ -40,11 +40,12 @@ pub fn broadcast(c: &mut Criterion) { || { let running = Arc::new(AtomicBool::new(true)); let gi_producer = TCache::producer("bench_q", 2 << 24); - let mut gi_consumer = gi_producer.cache_ref().consumer().unwrap(); + let mut gi_consumer = gi_producer.cache_ref().consumer("bench").unwrap(); let mut go_producer = TCache::producer("bench_q", 2 << 24); - let go_consumer = go_producer.cache_ref().random_access(false).unwrap(); + let go_consumer = + go_producer.cache_ref().random_access("bench", false).unwrap(); let rpc_in = TCache::producer("bench_q_small", 32); - let rpc_out = rpc_in.cache_ref().random_access(false).unwrap(); + let rpc_out = rpc_in.cache_ref().random_access("bench", false).unwrap(); let (mut server_tile, server_id) = { let secret = secp256k1::SecretKey::new(&mut rng); @@ -119,11 +120,12 @@ pub fn broadcast(c: &mut Criterion) { Endpoint::new(Arc::new(EndpointConfig::default()), None, false, None); let gi_producer = TCache::producer("bench_q", 2 << 24); - let gi_consumer = gi_producer.cache_ref().consumer().unwrap(); + let gi_consumer = gi_producer.cache_ref().consumer("bench").unwrap(); let go_producer = TCache::producer("bench_q_go", 2 << 28); - let go_consumer = go_producer.cache_ref().random_access(false).unwrap(); + let go_consumer = + go_producer.cache_ref().random_access("bench", false).unwrap(); let rpc_in = TCache::producer("bench_q_small", 32); - let rpc_out = rpc_in.cache_ref().random_access(false).unwrap(); + let rpc_out = rpc_in.cache_ref().random_access("bench", false).unwrap(); let context = Context { gossip_producer: gi_producer, diff --git a/crates/network/src/p2p/quic/peer.rs b/crates/network/src/p2p/quic/peer.rs index fd4f565..dd25d67 100644 --- a/crates/network/src/p2p/quic/peer.rs +++ b/crates/network/src/p2p/quic/peer.rs @@ -542,13 +542,14 @@ mod tests { impl PeerHarness { fn new() -> Self { let gossip_in_p = TCache::producer("gossip_in", TCACHE_BYTES); - let gossip_in_c = gossip_in_p.cache_ref().consumer().unwrap(); + let gossip_in_c = gossip_in_p.cache_ref().consumer("peer_gossip_in").unwrap(); let gossip_out_p = TCache::producer("gossip_out", TCACHE_BYTES); - let gossip_out_c = gossip_out_p.cache_ref().random_access(false).unwrap(); + let gossip_out_c = + gossip_out_p.cache_ref().random_access("peer_gossip_out", false).unwrap(); let rpc_in_p = TCache::producer("rpc_in", TCACHE_BYTES); let rpc_out_p = TCache::producer("rpc_out", TCACHE_BYTES); - let rpc_out_c = rpc_out_p.cache_ref().random_access(false).unwrap(); + let rpc_out_c = rpc_out_p.cache_ref().random_access("peer_rpc_out", false).unwrap(); Self { context: Context { diff --git a/crates/surfer/src/discovery.rs b/crates/surfer/src/discovery.rs index ee9867c..43221ac 100644 --- a/crates/surfer/src/discovery.rs +++ b/crates/surfer/src/discovery.rs @@ -8,7 +8,7 @@ //! - `timing-{name}` / `latency-{name}` — flux MPMC `TimingMessage` queues. //! - `tilemetrics-{name}` — flux SPMC `TileSample` queue. -use std::{fs, io, path::PathBuf}; +use std::{collections::HashMap, fs, io, path::PathBuf}; pub struct DiscoveredSources { pub counters: Vec, @@ -27,9 +27,12 @@ pub struct CounterFile { } pub struct TimingFile { - /// The `{name}` suffix from `timing-{name}`. + /// The `{name}` shared by `timing-{name}` and `latency-{name}`. pub name: String, - pub path: PathBuf, + /// Path to `timing-{name}` if the file exists. + pub timing_path: Option, + /// Path to `latency-{name}` if the file exists. + pub latency_path: Option, } pub struct TileMetricsFile { @@ -41,7 +44,9 @@ pub struct TileMetricsFile { pub fn discover(base_dir: &std::path::Path, app_name: &str) -> io::Result { let mut counters = Vec::new(); let mut tcaches = Vec::new(); - let mut timings = Vec::new(); + // timing-{name} and latency-{name} are flux Timer file pairs. + // Group by name so a single TimingFile carries both paths. + let mut timing_map: HashMap = HashMap::new(); let mut tilemetrics = Vec::new(); let dir = flux::utils::directories::shmem_dir_queues_with_base(base_dir, app_name); @@ -58,13 +63,28 @@ pub fn discover(base_dir: &std::path::Path, app_name: &str) -> io::Result = timing_map.into_values().collect(); counters.sort_by(|a, b| a.name.cmp(&b.name)); tcaches.sort_by(|a, b| a.name.cmp(&b.name)); timings.sort_by(|a, b| a.name.cmp(&b.name)); diff --git a/crates/surfer/src/main.rs b/crates/surfer/src/main.rs index 8fbc7fc..814af79 100644 --- a/crates/surfer/src/main.rs +++ b/crates/surfer/src/main.rs @@ -81,7 +81,7 @@ fn main() -> io::Result<()> { .filter_map(|f| match TimingSet::open(f) { Ok(t) => Some(t), Err(e) => { - eprintln!("surfer: skipping {}: {e}", f.path.display()); + eprintln!("surfer: skipping {}: {e}", f.name); None } }) diff --git a/crates/surfer/src/render/tcaches_pane.rs b/crates/surfer/src/render/tcaches_pane.rs index db648d0..da3fdbe 100644 --- a/crates/surfer/src/render/tcaches_pane.rs +++ b/crates/surfer/src/render/tcaches_pane.rs @@ -156,7 +156,9 @@ fn draw_chart(f: &mut Frame, area: Rect, app: &App) { let color = TAIL_COLORS[consumer_idx % TAIL_COLORS.len()]; let data: Vec<(f64, f64)> = hist.iter().enumerate().map(|(i, &v)| (i as f64, v as f64)).collect(); - series.push(ChartSeries { name: format!("tail_{consumer_idx}"), color, data }); + let name = set.consumer_name(consumer_idx); + let label = if name.is_empty() { format!("tail_{consumer_idx}") } else { name.to_string() }; + series.push(ChartSeries { name: label, color, data }); } let title = format!(" {} — 1s deltas (bytes/s) ", view.display_name()); diff --git a/crates/surfer/src/render/timings_pane.rs b/crates/surfer/src/render/timings_pane.rs index 49858af..3a62268 100644 --- a/crates/surfer/src/render/timings_pane.rs +++ b/crates/surfer/src/render/timings_pane.rs @@ -7,7 +7,10 @@ use ratatui::{ widgets::{Axis, Block, Borders, Cell, Chart, Dataset, GraphType, Paragraph, Row, Table}, }; -use crate::{app::App, sources::timings::TimingBucket}; +use crate::{ + app::App, + sources::timings::{TimingChannel, TimingSet}, +}; pub fn draw(f: &mut Frame, area: Rect, app: &mut App) { if app.timings.is_empty() { @@ -23,7 +26,7 @@ pub fn draw(f: &mut Frame, area: Rect, app: &mut App) { } if app.drilled_in { - draw_chart(f, area, app); + draw_charts(f, area, app); return; } let rows = Layout::default() @@ -35,13 +38,14 @@ pub fn draw(f: &mut Frame, area: Rect, app: &mut App) { .split(area); draw_table(f, rows[0], app); - draw_chart(f, rows[1], app); + draw_charts(f, rows[1], app); } fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { let block = Block::default().borders(Borders::ALL).title(" timings "); let header = Row::new(vec![ Cell::from("timer"), + Cell::from("kind"), Cell::from("last"), Cell::from("p50 (last bucket)"), Cell::from("p99 (last bucket)"), @@ -55,7 +59,10 @@ fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { .iter() .enumerate() .map(|(i, t)| { - let last = t.last_bucket().unwrap_or_default(); + let kind = kind_marker(t); + let primary = t.primary(); + let last_bucket = primary.and_then(|c| c.last_bucket()).unwrap_or_default(); + let last_ns = primary.map(|c| c.last_ns).unwrap_or(0); let style = if i == app.timings_selection { Style::default().bg(Color::DarkGray).fg(Color::White).add_modifier(Modifier::BOLD) } else { @@ -63,17 +70,19 @@ fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { }; Row::new(vec![ Cell::from(Span::styled(t.name.clone(), style)), - Cell::from(format_ns(t.last_ns)), - Cell::from(format_ns(last.p50_ns)), - Cell::from(format_ns(last.p99_ns)), - Cell::from(format!("{:>10}", last.count)), + Cell::from(kind), + Cell::from(format_ns(last_ns)), + Cell::from(format_ns(last_bucket.p50_ns)), + Cell::from(format_ns(last_bucket.p99_ns)), + Cell::from(format!("{:>10}", last_bucket.count)), ]) .height(1) }) .collect(); let widths = [ - Constraint::Percentage(30), + Constraint::Percentage(35), + Constraint::Length(6), Constraint::Length(12), Constraint::Length(18), Constraint::Length(18), @@ -84,35 +93,101 @@ fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { f.render_stateful_widget(table, area, &mut app.timings_table_state); } -fn draw_chart(f: &mut Frame, area: Rect, app: &mut App) { +fn kind_marker(t: &TimingSet) -> &'static str { + let has_t = t.timing.as_ref().is_some_and(TimingChannel::has_data); + let has_l = t.latency.as_ref().is_some_and(TimingChannel::has_data); + match (has_t, has_l) { + (true, true) => "T+L", + (true, false) => "T", + (false, true) => "L", + (false, false) => "·", + } +} + +fn draw_charts(f: &mut Frame, area: Rect, app: &App) { let Some(timer) = app.timings.get(app.timings_selection) else { f.render_widget(Block::default().borders(Borders::ALL).title(" history "), area); return; }; + + let has_t = timer.timing.as_ref().is_some_and(TimingChannel::has_data); + let has_l = timer.latency.as_ref().is_some_and(TimingChannel::has_data); + match (has_t, has_l) { + (true, true) => { + let rows = Layout::default() + .direction(Direction::Vertical) + .constraints([Constraint::Percentage(50), Constraint::Percentage(50)]) + .split(area); + draw_channel_chart( + f, + rows[0], + &timer.name, + "processing", + Color::Cyan, + timer.timing.as_ref().unwrap(), + ); + draw_channel_chart( + f, + rows[1], + &timer.name, + "latency", + Color::Magenta, + timer.latency.as_ref().unwrap(), + ); + } + (true, false) => draw_channel_chart( + f, + area, + &timer.name, + "processing", + Color::Cyan, + timer.timing.as_ref().unwrap(), + ), + (false, true) => draw_channel_chart( + f, + area, + &timer.name, + "latency", + Color::Magenta, + timer.latency.as_ref().unwrap(), + ), + (false, false) => { + let block = Block::default().borders(Borders::ALL).title(format!(" {} ", timer.name)); + let inner = block.inner(area); + f.render_widget(block, area); + f.render_widget( + Paragraph::new("no events drained yet").style(Style::default().fg(Color::DarkGray)), + inner, + ); + } + } +} + +fn draw_channel_chart( + f: &mut Frame, + area: Rect, + name: &str, + kind: &str, + p50_color: Color, + ch: &TimingChannel, +) { let title = format!( - " {} — p50 / p99 over {}s buckets ", - timer.name, + " {name} — {kind} p50 / p99 over {}s buckets ", crate::sources::counters::BUCKET_SECS ); let block = Block::default().borders(Borders::ALL).title(title); - let buckets: &std::collections::VecDeque = &timer.history; - let live = timer.current_bucket(); + let buckets = &ch.history; + let live = ch.current_bucket(); if buckets.is_empty() && live.is_none() { f.render_widget(block, area); return; } - let mut p50: Vec<(f64, f64)> = buckets - .iter() - .enumerate() - .map(|(i, b)| (i as f64, b.p50_ns as f64 / 1_000.0)) // microseconds - .collect(); + let mut p50: Vec<(f64, f64)> = + buckets.iter().enumerate().map(|(i, b)| (i as f64, b.p50_ns as f64 / 1_000.0)).collect(); let mut p99: Vec<(f64, f64)> = buckets.iter().enumerate().map(|(i, b)| (i as f64, b.p99_ns as f64 / 1_000.0)).collect(); - - // Append the live in-progress bucket as the rightmost point so the - // chart updates on every UI tick rather than only on bucket roll. if let Some(live) = live { let x = buckets.len() as f64; p50.push((x, live.p50_ns as f64 / 1_000.0)); @@ -127,13 +202,13 @@ fn draw_chart(f: &mut Frame, area: Rect, app: &mut App) { Dataset::default() .name("p50 µs") .marker(symbols::Marker::Braille) - .style(Style::default().fg(Color::Cyan)) + .style(Style::default().fg(p50_color)) .graph_type(GraphType::Line) .data(&p50), Dataset::default() .name("p99 µs") .marker(symbols::Marker::Braille) - .style(Style::default().fg(Color::Magenta)) + .style(Style::default().fg(Color::Yellow)) .graph_type(GraphType::Line) .data(&p99), ]; @@ -155,7 +230,7 @@ fn draw_chart(f: &mut Frame, area: Rect, app: &mut App) { .y_axis( Axis::default() .bounds([0.0, y_max * 1.1]) - .labels(vec![Line::from("0"), Line::from(format!("{:.0}µs", y_max))]) + .labels(vec![Line::from("0"), Line::from(format!("{y_max:.0}µs"))]) .style(Style::default().fg(Color::DarkGray)), ); f.render_widget(chart, area); diff --git a/crates/surfer/src/sources/counters.rs b/crates/surfer/src/sources/counters.rs index 8742dd5..a672e2c 100644 --- a/crates/surfer/src/sources/counters.rs +++ b/crates/surfer/src/sources/counters.rs @@ -23,6 +23,9 @@ use crate::discovery::CounterFile; pub const BUCKET_SECS: u64 = 1; /// 1 s bucket × 240 = 4 minutes of history. pub const BUCKET_HISTORY_LEN: usize = 240; +/// Per-consumer name buffer size (matches +/// `silver_common::spine::tcache::metrics::NAME_LEN`). +const CONSUMER_NAME_LEN: usize = 32; pub struct CounterSet { pub name: String, @@ -31,6 +34,12 @@ pub struct CounterSet { /// are positional fallbacks because no schema was wired up /// (`false`). pub schema_registered: bool, + /// Companion read-only mmap into `tcache-names-{name}` when the + /// CounterSet is a tcache. `n_consumers × 32` bytes of zero-padded + /// UTF-8. `None` for non-tcache counters and for tcaches whose + /// names file couldn't be opened. + consumer_names_base: *const u8, + consumer_names_bytes: usize, base: *const AtomicU64, slot_count: usize, map_bytes: usize, @@ -64,10 +73,33 @@ impl CounterSet { let map_bytes = slot_count * 8; let base = mmap_readonly(&file.path, map_bytes)?; let (slot_names, schema_registered) = crate::schema::names_for(&file.name, slot_count); + + // For tcache counters, try to open the companion names file + // `tcache-names-{tcache_name}`. Failure is non-fatal — surfer + // continues to render with positional labels. + let (consumer_names_base, consumer_names_bytes) = + if let Some(tc_name) = file.name.strip_prefix("tcache-") { + let n_consumers = slot_count.saturating_sub(2); + let names_bytes = n_consumers * CONSUMER_NAME_LEN; + let names_path = file + .path + .parent() + .map(|d| d.join(format!("tcache-names-{tc_name}"))) + .unwrap_or_default(); + match mmap_readonly_bytes(&names_path, names_bytes) { + Ok(p) => (p, names_bytes), + Err(_) => (std::ptr::null(), 0), + } + } else { + (std::ptr::null(), 0) + }; + Ok(Self { name: file.name.clone(), slot_names, schema_registered, + consumer_names_base, + consumer_names_bytes, base, slot_count, map_bytes, @@ -79,6 +111,26 @@ impl CounterSet { }) } + /// Return the consumer name string for tail slot `consumer_idx` + /// (= slot index minus 2 for tcaches). Empty string when the + /// names file isn't open or the slot is uninitialised. + pub fn consumer_name(&self, consumer_idx: usize) -> &str { + if self.consumer_names_base.is_null() { + return ""; + } + let off = consumer_idx * CONSUMER_NAME_LEN; + if off + CONSUMER_NAME_LEN > self.consumer_names_bytes { + return ""; + } + // SAFETY: bounds checked above; bytes are written by the + // producer side under a zero-padded UTF-8 convention. + let bytes = unsafe { + std::slice::from_raw_parts(self.consumer_names_base.add(off), CONSUMER_NAME_LEN) + }; + let end = bytes.iter().position(|&b| b == 0).unwrap_or(CONSUMER_NAME_LEN); + std::str::from_utf8(&bytes[..end]).unwrap_or("") + } + /// Read all slots into `current`, after copying the previous tick's /// values into `previous`. O(slot_count) atomic loads. pub fn sample(&mut self) { @@ -139,15 +191,39 @@ impl CounterSet { impl Drop for CounterSet { fn drop(&mut self) { - // SAFETY: `base` was returned by mmap with `map_bytes`; nothing - // else holds a reference into it (we hand out only borrowed - // slices via accessors that don't outlive `&self`). + // SAFETY: both pointers came from `mmap` with the recorded sizes; + // nothing else holds references into them (we hand out only + // borrowed slices via accessors that don't outlive `&self`). unsafe { libc::munmap(self.base as *mut libc::c_void, self.map_bytes); + if !self.consumer_names_base.is_null() { + libc::munmap( + self.consumer_names_base as *mut libc::c_void, + self.consumer_names_bytes, + ); + } } } } +fn mmap_readonly_bytes(path: &Path, bytes: usize) -> io::Result<*const u8> { + let file = OpenOptions::new().read(true).open(path)?; + let ptr = unsafe { + libc::mmap( + std::ptr::null_mut(), + bytes, + libc::PROT_READ, + libc::MAP_SHARED, + file.as_raw_fd(), + 0, + ) + }; + if ptr == libc::MAP_FAILED { + return Err(io::Error::last_os_error()); + } + Ok(ptr.cast::()) +} + #[cfg(test)] mod tests { use silver_common::declare_counters; diff --git a/crates/surfer/src/sources/timings.rs b/crates/surfer/src/sources/timings.rs index eaef455..28a068d 100644 --- a/crates/surfer/src/sources/timings.rs +++ b/crates/surfer/src/sources/timings.rs @@ -1,10 +1,14 @@ -//! Consumer for a flux `timing-{name}` shmem queue. Drains -//! `TimingMessage` events into a running `hdrhistogram::Histogram` -//! (recording the elapsed-nanos per event). On bucket roll the -//! histogram's percentiles are snapshotted into a 240-deep ring and -//! the histogram is reset. +//! Consumer for flux `timing-{name}` / `latency-{name}` shmem queue +//! pairs. Each `TimingSet` covers one flux `Timer` instance — flux +//! always creates both queue files, but a given Timer may only emit +//! to one side (e.g. `#[timed]` writes only processing, tcache +//! consumer timers write only latency). Channels with no observed +//! events render as "no data" instead of empty graphs. +//! +//! On `roll_bucket` each channel's running histogram is snapshotted +//! into its 240-deep ring and reset. -use std::collections::VecDeque; +use std::{collections::VecDeque, path::Path}; use flux::communication::{ queue::{ConsumerBare, Queue}, @@ -14,7 +18,7 @@ use hdrhistogram::Histogram; use crate::{discovery::TimingFile, sources::counters::BUCKET_HISTORY_LEN}; -/// Bucket snapshot of one timer's distribution. +/// Bucket snapshot of one channel's distribution. #[derive(Clone, Copy, Debug, Default)] pub struct TimingBucket { pub count: u64, @@ -22,33 +26,22 @@ pub struct TimingBucket { pub p99_ns: u64, } -pub struct TimingSet { - pub name: String, +/// One side of a flux `Timer` — either processing or latency. +pub struct TimingChannel { consumer: ConsumerBare, - /// Running histogram for the *current* bucket. hist: Histogram, - /// Last successfully decoded elapsed-nanos value, for the live - /// column. pub last_ns: u64, - /// Total events drained since open. pub total_count: u64, - /// Ring of completed-bucket snapshots (newest at back). pub history: VecDeque, } -impl TimingSet { - pub fn open(file: &TimingFile) -> Result { - // flux `Queue::open_shared` panics on failure; use try_open - // to surface the error cleanly. - let queue: Queue = Queue::try_open_shared(&file.path) - .map_err(|e| format!("open_shared({:?}): {e:?}", file.path))?; - let label: &'static str = Box::leak(format!("surfer-{}", file.name).into_boxed_str()); - // `try_consume` lazily inits the broadcast cursor on first call. +impl TimingChannel { + fn open(path: &Path, label: &'static str) -> Result { + let queue: Queue = + Queue::try_open_shared(path).map_err(|e| format!("open_shared({path:?}): {e:?}"))?; let consumer = ConsumerBare::::new(queue, label); Ok(Self { - name: file.name.clone(), consumer, - // 1 ns .. 60 s, 3 significant digits. hist: Histogram::::new_with_bounds(1, 60_000_000_000, 3).expect("hdrhist bounds"), last_ns: 0, total_count: 0, @@ -56,7 +49,6 @@ impl TimingSet { }) } - /// Drain everything currently available in the queue. pub fn drain(&mut self) { let mut msg = TimingMessage::default(); while self.consumer.try_consume(&mut msg).is_ok() { @@ -66,12 +58,10 @@ impl TimingSet { let ns = msg.elapsed().0; self.last_ns = ns; self.total_count += 1; - // Saturate at histogram upper bound rather than fail. self.hist.saturating_record(ns); } } - /// Snapshot p50/p99 + count into the bucket ring, then reset. pub fn roll_bucket(&mut self) { let bucket = TimingBucket { count: self.hist.len(), @@ -89,8 +79,8 @@ impl TimingSet { self.history.back().copied() } - /// Live in-progress bucket aggregates — drained on every UI tick. - /// Returns `None` when the running histogram is empty. + /// Live in-progress bucket — `None` when the running histogram is + /// empty. Used to extend the chart with a per-tick point. pub fn current_bucket(&self) -> Option { if self.hist.is_empty() { None @@ -102,4 +92,62 @@ impl TimingSet { }) } } + + /// `true` once at least one event has been drained — used by the + /// pane to decide whether to render a chart for this channel. + pub fn has_data(&self) -> bool { + self.total_count > 0 + } +} + +pub struct TimingSet { + pub name: String, + pub timing: Option, + pub latency: Option, +} + +impl TimingSet { + pub fn open(file: &TimingFile) -> Result { + let timing = if let Some(path) = &file.timing_path { + let label: &'static str = Box::leak(format!("surfer-t-{}", file.name).into_boxed_str()); + TimingChannel::open(path, label).ok() + } else { + None + }; + let latency = if let Some(path) = &file.latency_path { + let label: &'static str = Box::leak(format!("surfer-l-{}", file.name).into_boxed_str()); + TimingChannel::open(path, label).ok() + } else { + None + }; + if timing.is_none() && latency.is_none() { + return Err(format!("no openable queue for {}", file.name)); + } + Ok(Self { name: file.name.clone(), timing, latency }) + } + + pub fn drain(&mut self) { + if let Some(c) = &mut self.timing { + c.drain(); + } + if let Some(c) = &mut self.latency { + c.drain(); + } + } + + pub fn roll_bucket(&mut self) { + if let Some(c) = &mut self.timing { + c.roll_bucket(); + } + if let Some(c) = &mut self.latency { + c.roll_bucket(); + } + } + + /// Channel preferred for the table row's summary stats: latency + /// when present (more interesting for spine/tcache timers), else + /// timing. + pub fn primary(&self) -> Option<&TimingChannel> { + self.latency.as_ref().filter(|c| c.has_data()).or(self.timing.as_ref()) + } }