Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/beacon_state/src/tile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1710,8 +1710,8 @@ mod tests {
let ticker = SlotTicker::new(genesis, Duration::from_secs(12), Duration::from_secs(4));
let gossip_p = TCache::producer("test_gossip", 1 << 20);
let event_p = TCache::producer("test_event", 1 << 20);
let gossip_c = gossip_p.cache_ref().random_access(true).unwrap();
let rpc_c = event_p.cache_ref().random_access(true).unwrap();
let gossip_c = gossip_p.cache_ref().random_access("test_gossip", true).unwrap();
let rpc_c = event_p.cache_ref().random_access("test_event", true).unwrap();
BeaconStateTile::new_heap(ticker, SpecConfig::mainnet(), gossip_c, rpc_c, &[])
}

Expand Down
4 changes: 2 additions & 2 deletions crates/beacon_state/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ impl Harness {
let gossip_in_producer = TCache::producer("gossip_in", 1 << 24);
let rpc_in_producer = TCache::producer("rpc_in", 1 << 24);
let gossip_consumer =
gossip_in_producer.cache_ref().random_access(true).expect("gossip ra");
let rpc_consumer = rpc_in_producer.cache_ref().random_access(true).expect("rpc ra");
gossip_in_producer.cache_ref().random_access("test", true).expect("gossip ra");
let rpc_consumer = rpc_in_producer.cache_ref().random_access("test", true).expect("rpc ra");

let tile = build_tile(ticker, gossip_consumer, rpc_consumer);

Expand Down
15 changes: 10 additions & 5 deletions crates/bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,17 @@ fn main() -> Result<(), Box<dyn Error>> {
// TCaches
let incoming_gossip_producer =
TCache::producer("incoming_gossip", config.incoming_gossip_tcache_size());
let incoming_gossip_consumer = incoming_gossip_producer.cache_ref().consumer()?;
let incoming_gossip_consumer =
incoming_gossip_producer.cache_ref().consumer("incoming_gossip")?;
let ssz_gossip_producer =
TCache::producer("ssz_gossip", config.incoming_gossip_ssz_tcache_size());
let ssz_gossip_consumer = ssz_gossip_producer.cache_ref().random_access(true)?;
let ssz_gossip_consumer =
ssz_gossip_producer.cache_ref().random_access("bs_ssz_gossip", true)?;
let outgoing_gossip_producer =
TCache::producer("outgoing_gossip", config.outgoing_gossip_tcache_size());
let incoming_rpc_producer = TCache::producer("incoming_rpc", config.incoming_rpc_tcache_size());
let incoming_rpc_consumer = incoming_rpc_producer.cache_ref().random_access(true)?;
let incoming_rpc_consumer =
incoming_rpc_producer.cache_ref().random_access("bs_incoming_rpc", true)?;

// rpc producer
let outgoing_rpc_producer = TCache::producer("outgoing_rpc", config.outgoing_rpc_tcache_size());
Expand Down Expand Up @@ -86,9 +89,11 @@ fn main() -> Result<(), Box<dyn Error>> {
);
let p2p_context = Context {
gossip_producer: incoming_gossip_producer,
gossip_consumer: outgoing_gossip_producer.cache_ref().random_access(true)?,
gossip_consumer: outgoing_gossip_producer
.cache_ref()
.random_access("p2p_outgoing_gossip", true)?,
rpc_producer: incoming_rpc_producer,
rpc_consumer: outgoing_rpc_producer.cache_ref().random_access(true)?,
rpc_consumer: outgoing_rpc_producer.cache_ref().random_access("p2p_outgoing_rpc", true)?,
identify: Some(ProtoIdentify::from((&config.identify()?, &keypair))),
};

Expand Down
41 changes: 36 additions & 5 deletions crates/common/src/spine/tcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl TCache {
self.name
}

pub fn consumer(&self) -> Result<Consumer, Error> {
pub fn consumer(&self, name: &'static str) -> Result<Consumer, Error> {
// find start seq
let seq = self.head.seq.load(Ordering::Acquire);

Expand All @@ -116,16 +116,22 @@ impl TCache {
// Publish a baseline tail so surfer's chart has a value to plot
// before the consumer has actually called free.
self.record_tail(index, seq);
self.record_consumer_name(index, name);

Ok(Consumer {
cache: TCacheRef { cache: addr_of!(*self) as *const c_void },
index,
seq,
next_seq: seq,
timer: self.create_consumer_timer(name),
})
}

pub fn random_access(&self, auto_free: bool) -> Result<RandomAccessConsumer, Error> {
pub fn random_access(
&self,
name: &'static str,
auto_free: bool,
) -> Result<RandomAccessConsumer, Error> {
// find start seq
let seq = self.head.seq.load(Ordering::Acquire);

Expand All @@ -139,15 +145,40 @@ impl TCache {
.ok_or(Error::MaxConsumers)?;

self.record_tail(index, seq);
self.record_consumer_name(index, name);

Ok(RandomAccessConsumer {
cache: TCacheRef { cache: addr_of!(*self) as *const c_void },
index,
active: Buckets::new(32 * 1024, self.len as u64),
auto_free,
timer: self.create_consumer_timer(name),
})
}

#[inline]
fn record_consumer_name(&self, idx: usize, name: &'static str) {
if let Some(m) = &self.metrics {
m.set_consumer_name(idx, name);
}
}

/// Build a per-consumer flux Timer named
/// `tcache-{tcache_name}-{consumer_name}`. Result file paths are
/// `timing-tcache-{tcache_name}-{consumer_name}` and
/// `latency-tcache-{tcache_name}-{consumer_name}` under flux's
/// `shmem/queues` directory — surfer picks up the latency queue
/// via the existing timings discovery.
///
/// Returns `None` when this TCache has no metrics (unnamed or
/// metrics file open failed) so the consumer's read path is a
/// no-op for latency.
fn create_consumer_timer(&self, consumer_name: &'static str) -> Option<flux::Timer> {
self.metrics.as_ref()?;
let label = format!("tcache-{}-{}", self.name, consumer_name);
Some(flux::Timer::new("silver", &label))
}

fn index(&self, seq: u64) -> usize {
(seq & (self.len - 1) as u64) as usize
}
Expand Down Expand Up @@ -623,7 +654,7 @@ mod tests {

let mut producer = TCache::producer("test_tcache", TCACHE_SIZE);
let mut consumers: Vec<Consumer> =
(0..CONSUMERS).map(|_| producer.cache_ref().consumer().unwrap()).collect();
(0..CONSUMERS).map(|_| producer.cache_ref().consumer("test").unwrap()).collect();

let done = Arc::new(AtomicBool::new(false));

Expand Down Expand Up @@ -677,7 +708,7 @@ mod tests {

let mp = TCache::multi_producer("test_mp", TCACHE_SIZE);
let mut consumers: Vec<Consumer> =
(0..CONSUMERS).map(|_| mp.cache_ref().consumer().unwrap()).collect();
(0..CONSUMERS).map(|_| mp.cache_ref().consumer("test").unwrap()).collect();

// Spawn consumers BEFORE producers so they observe the full stream
// from seq 0 and don't miss early commits.
Expand Down Expand Up @@ -739,7 +770,7 @@ mod tests {
#[test]
fn produce_consume() {
let mut producer = TCache::producer("test_buckets", 2 << 14);
let mut consumer = producer.cache_ref().consumer().unwrap();
let mut consumer = producer.cache_ref().consumer("test").unwrap();

let prod = std::thread::spawn(move || {
let mut rng = rand::thread_rng();
Expand Down
26 changes: 21 additions & 5 deletions crates/common/src/spine/tcache/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{ops::Deref, sync::atomic::Ordering};

use flux::timing::Nanos;
use flux::{Timer, timing::Nanos};

use crate::{GossipMsgOut, TCacheError, TCacheRef};

Expand Down Expand Up @@ -54,6 +54,11 @@ pub struct Consumer {
pub(super) index: usize,
pub(super) seq: u64,
pub(super) next_seq: u64,
/// Per-consumer flux Timer emitting `latency-tcache-{tcache}-{name}`
/// — measures elapsed from `slot.reserve_ns` to first read.
/// `None` if the TCache wasn't named or the Timer queue couldn't
/// be opened.
pub(super) timer: Option<Timer>,
}

impl Consumer {
Expand All @@ -66,6 +71,9 @@ impl Consumer {
})?;

if !data.is_empty() {
if let Some(timer) = &mut self.timer {
timer.emit_latency_from_nanos(ts, Nanos::now());
}
return Ok((data, ts));
}
self.seq = self.next_seq;
Expand All @@ -83,7 +91,7 @@ impl Consumer {
}

/// Consumer that supports random access to messages between its tail and buffer
/// head. Tail is tracked externally.
/// head. Tail is tracked externally.
pub struct RandomAccessConsumer {
pub(super) cache: TCacheRef,
pub(super) index: usize,
Expand All @@ -92,11 +100,19 @@ pub struct RandomAccessConsumer {
// If set `free` is called on drop of every acquired read, ensuring the
// consumer tail is published.
pub(super) auto_free: bool,
/// Per-consumer flux Timer emitting `latency-tcache-{tcache}-{name}`
/// — measures elapsed from `slot.reserve_ns` at acquire time.
pub(super) timer: Option<Timer>,
}

impl RandomAccessConsumer {
pub fn acquire(&mut self, read: TCacheRead) -> AcquiredRead {
self.active.acquire(read.seq);
if let Some(timer) = &mut self.timer {
if let Ok(reserve_ns) = read.cache_ts() {
timer.emit_latency_from_nanos(reserve_ns, Nanos::now());
}
}
AcquiredRead { consumer: self as *const Self, read }
}

Expand Down Expand Up @@ -337,7 +353,7 @@ mod tests {
#[test]
fn acquire_release_cycle_reads_buffer() {
let mut producer = TCache::producer("test_consumer", 1 << 16);
let mut consumer = producer.cache_ref().random_access(false).unwrap();
let mut consumer = producer.cache_ref().random_access("test", false).unwrap();
producer.publish_head();

let reads: Vec<TCacheRead> =
Expand Down Expand Up @@ -367,7 +383,7 @@ mod tests {
const TOTAL: usize = 1000;

let mut producer = TCache::producer("test_consumer", CACHE);
let mut consumer = producer.cache_ref().random_access(false).unwrap();
let mut consumer = producer.cache_ref().random_access("test", false).unwrap();
producer.publish_head();

let mut held: Vec<AcquiredRead> = Vec::new();
Expand Down Expand Up @@ -403,7 +419,7 @@ mod tests {
const MSG_LEN: usize = 8 * 1024;

let mut producer = TCache::producer("test_consumer", CACHE);
let mut consumer = producer.cache_ref().random_access(false).unwrap();
let mut consumer = producer.cache_ref().random_access("test", false).unwrap();
producer.publish_head();

// The "victim" — held across heavy downstream production.
Expand Down
97 changes: 86 additions & 11 deletions crates/common/src/spine/tcache/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
//! 1: head_seq — updated by `Producer::publish_head`.
//! 2..2+N: tail_seq[i] — updated by consumer `i`'s `free`.
//!
//! Surfer renders the file as a ring-buffer occupancy bar
//! (head − min_tail / capacity) plus current length, decoded
//! dynamically from the file's slot count.
//! Companion file `tcache-names-{name}` stores per-consumer names
//! as fixed-width zero-padded UTF-8 buffers (`MAX_CONSUMERS × NAME_LEN`
//! bytes). Each consumer writes its name at registration time so
//! surfer can label rows by consumer identity instead of positional
//! `tail_{i}`.

use std::{
fs::OpenOptions,
io,
os::fd::AsRawFd,
sync::atomic::{AtomicU64, Ordering},
};

Expand All @@ -22,11 +26,19 @@ const FIXED_SLOTS: usize = 2;
const CAPACITY_IDX: usize = 0;
/// Index of the head_seq slot.
const HEAD_IDX: usize = 1;
/// Bytes per consumer-name slot. UTF-8 zero-padded; tail bytes zero.
/// 32 fits typical labels like `data_columns_gossip` with headroom.
pub const NAME_LEN: usize = 32;

pub struct TCacheMetrics {
base: *mut AtomicU64,
n_consumers: usize,
map_bytes: usize,
/// Companion mmap for consumer names. `n_consumers * NAME_LEN`
/// bytes. `None` if the names file couldn't be opened (e.g.
/// permissions); name writes become no-ops.
names_base: *mut u8,
names_bytes: usize,
}

// SAFETY: `base` points at an `mmap(MAP_SHARED)` region; every access
Expand All @@ -36,15 +48,17 @@ unsafe impl Send for TCacheMetrics {}
unsafe impl Sync for TCacheMetrics {}

impl TCacheMetrics {
/// mmap the `counters-tcache-{name}` file under
/// mmap the `counters-tcache-{name}` file and companion
/// `tcache-names-{name}` file under
/// `flux::utils::directories::local_share_dir() / "silver" / shmem /
/// queues`, pre-populate the capacity slot.
/// queues`. Pre-populates the capacity slot, fills tail slots with the
/// `u64::MAX` sentinel, and zeroes the names buffer.
pub fn new(name: &str, n_consumers: usize, capacity: u64) -> io::Result<Self> {
let slots = FIXED_SLOTS + n_consumers;
let map_bytes = slots * std::mem::size_of::<AtomicU64>();
let file_name = format!("tcache-{name}");
let counters_file = format!("tcache-{name}");
let base_dir = flux::utils::directories::local_share_dir();
let base = mmap_counters_file(&base_dir, "silver", &file_name, map_bytes)?;
let base = mmap_counters_file(&base_dir, "silver", &counters_file, map_bytes)?;
// SAFETY: ptr is valid for `slots` AtomicU64s.
unsafe {
(*base.add(CAPACITY_IDX)).store(capacity, Ordering::Relaxed);
Expand All @@ -55,7 +69,15 @@ impl TCacheMetrics {
(*base.add(FIXED_SLOTS + i)).store(u64::MAX, Ordering::Relaxed);
}
}
Ok(Self { base, n_consumers, map_bytes })

let names_bytes = n_consumers * NAME_LEN;
let names_file = format!("tcache-names-{name}");
let names_base = mmap_bytes_file(&base_dir, "silver", &names_file, names_bytes)?;
// SAFETY: pointer is valid for `names_bytes`. Zero on init so
// unset slots have empty names.
unsafe { std::ptr::write_bytes(names_base, 0, names_bytes) };

Ok(Self { base, n_consumers, map_bytes, names_base, names_bytes })
}

#[inline]
Expand All @@ -72,12 +94,65 @@ impl TCacheMetrics {
(*self.base.add(FIXED_SLOTS + consumer_idx)).store(v, Ordering::Relaxed);
}
}

/// Write a consumer's name into its slot. Truncated to `NAME_LEN`
/// bytes. Trailing bytes are zero-padded so readers can find the
/// terminator.
pub fn set_consumer_name(&self, idx: usize, name: &str) {
debug_assert!(idx < self.n_consumers);
if idx >= self.n_consumers {
return;
}
let bytes = name.as_bytes();
let len = bytes.len().min(NAME_LEN);
// SAFETY: idx bounded by `n_consumers`; `idx * NAME_LEN + NAME_LEN`
// is within `names_bytes`.
unsafe {
let dst = self.names_base.add(idx * NAME_LEN);
std::ptr::write_bytes(dst, 0, NAME_LEN);
std::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, len);
}
}
}

impl Drop for TCacheMetrics {
fn drop(&mut self) {
// SAFETY: `base` was returned by mmap with `map_bytes`; no other
// references into this region outlive `self`.
unsafe { libc::munmap(self.base.cast::<libc::c_void>(), self.map_bytes) };
// SAFETY: both pointers came from `mmap` with the recorded sizes;
// no other references into either region outlive `self`.
unsafe {
libc::munmap(self.base.cast::<libc::c_void>(), self.map_bytes);
libc::munmap(self.names_base.cast::<libc::c_void>(), self.names_bytes);
}
}
}

/// Open/create + mmap a raw bytes shmem file under flux's
/// `shmem/queues` directory. Layered alongside `mmap_counters_file`
/// for non-u64 layouts.
fn mmap_bytes_file(
base_dir: &std::path::Path,
app_name: &str,
file_name: &str,
bytes: usize,
) -> io::Result<*mut u8> {
let dir = flux::utils::directories::shmem_dir_queues_with_base(base_dir, app_name);
std::fs::create_dir_all(&dir)?;
let path = dir.join(file_name);
let file =
OpenOptions::new().read(true).write(true).create(true).truncate(false).open(&path)?;
file.set_len(bytes as u64)?;
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
bytes,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED,
file.as_raw_fd(),
0,
)
};
if ptr == libc::MAP_FAILED {
return Err(io::Error::last_os_error());
}
Ok(ptr.cast::<u8>())
}
Loading
Loading