Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions crates/beacon_state/src/tile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use flux::{
use silver_common::{
BeaconStateEvent, GossipTopic, NewGossipMsg, P2pStreamId, PeerEvent, RejectSource, RpcInbound,
RpcMsg, RpcResponse, RpcResponseInbound, RpcSeverity, SilverSpine, SpecConfig, SyncUpdate,
TCacheRead, TRandomAccess,
TRandomAccess, TRead,
ssz_view::{
AttesterSlashingView, PROPOSER_SLASHING_SIZE, ProposerSlashingView, SIGNED_BLS_CHANGE_SIZE,
SIGNED_VOLUNTARY_EXIT_SIZE, SINGLE_ATT_SIZE, STATUS_V2_SIZE, SignedAggregateAndProofView,
Expand Down Expand Up @@ -536,7 +536,7 @@ impl BeaconStateTile {
fn apply_block(
&mut self,
data: &[u8],
data_tcache: TCacheRead,
data_tcache: TRead,
source: RejectSource,
producers: &mut Producers,
) -> Feedback {
Expand All @@ -560,7 +560,7 @@ impl BeaconStateTile {
return f;
}

producers.produce(BeaconStateEvent::PersistBlock(data_tcache));
producers.produce(BeaconStateEvent::PersistBlock(data_tcache.read));

let head_changed = self.last_applied != prev_last_applied;
let new_finalized =
Expand Down Expand Up @@ -762,7 +762,8 @@ impl BeaconStateTile {
fn handle_gossip(&mut self, m: NewGossipMsg, data: &[u8], producers: &mut Producers) {
let feedback = match m.topic {
GossipTopic::BeaconBlock => {
Some(self.apply_block(data, m.ssz, RejectSource::Gossip, producers))
let acquired = self.gossip_consumer.acquire(m.ssz);
Some(self.apply_block(data, acquired, RejectSource::Gossip, producers))
}
GossipTopic::BeaconAttestation(_) => Some(self.handle_attestation(data)),
GossipTopic::BeaconAggregateAndProof => Some(self.handle_aggregate_and_proof(data)),
Expand Down Expand Up @@ -794,7 +795,7 @@ impl BeaconStateTile {
msg: RpcMsg,
sender: P2pStreamId,
data: &[u8],
data_tcache: TCacheRead,
data_tcache: TRead,
producers: &mut Producers,
) {
if let RpcMsg::BlocksRangeResp(_) = msg {
Expand Down Expand Up @@ -1581,7 +1582,7 @@ impl Tile<SilverSpine> for BeaconStateTile {
}
});

adapter.consume(|m: RpcInbound, producers| {
adapter.consume_one(|m: RpcInbound, producers| {
if let RpcInbound::Response(RpcResponseInbound {
application_id: _,
stream_id,
Expand All @@ -1597,7 +1598,7 @@ impl Tile<SilverSpine> for BeaconStateTile {
RpcMsg::BlocksRangeResp(SignedBeaconBlockView),
stream_id,
unsafe { &*p },
ssz,
acquired,
producers,
);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct Config {
outgoing_gossip_tcache_size: usize,
#[serde(default = "default_usize::<33554432>")] // 2 << 24
incoming_gossip_ssz_tcache_size: usize,
#[serde(default = "default_usize::<33554432>")] // 2 << 24
#[serde(default = "default_usize::<67108864>")] // 2 << 25
incoming_rpc_tcache_size: usize,
#[serde(default = "default_usize::<33554432>")] // 2 << 24
outgoing_rpc_tcache_size: usize,
Expand Down Expand Up @@ -114,7 +114,7 @@ impl Config {
incoming_gossip_tcache_size: 2 << 24, // protobuf
outgoing_gossip_tcache_size: 2 << 24, // protobuf
incoming_gossip_ssz_tcache_size: 2 << 24, // ssz
incoming_rpc_tcache_size: 2 << 24, // ssz
incoming_rpc_tcache_size: 2 << 25, // ssz
outgoing_rpc_tcache_size: 2 << 24, // ssz
}
}
Expand Down
42 changes: 24 additions & 18 deletions crates/common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,16 @@ impl Drop for TimerGuard {
}

/// Open / create the counters file, ftruncate to `bytes`, mmap shared,
/// and publish the base pointer to `target`. No-op if `target` is
/// already non-null (idempotent across repeat `init()` calls).
///
/// Counter files land in flux's standard shmem-queues directory —
/// `{base_dir}/{app_name}/shmem/queues/counters-{file_name}` — so a
/// single observer-side scan picks up both flux queues and counter
/// files in one walk.
///
/// Called by macro-generated `init()` functions; not intended for
/// direct use.
pub fn map_counters(
/// and return the base pointer. Counter files land in flux's standard
/// shmem-queues directory —
/// `{base_dir}/{app_name}/shmem/queues/counters-{file_name}` — so a single
/// observer-side scan picks up both flux queues and counter files in one walk.
pub fn mmap_counters_file(
base_dir: &Path,
app_name: &str,
file_name: &str,
bytes: usize,
target: &AtomicPtr<AtomicU64>,
) -> io::Result<()> {
if !target.load(Ordering::Relaxed).is_null() {
return Ok(());
}

) -> io::Result<*mut AtomicU64> {
let dir = flux::utils::directories::shmem_dir_queues_with_base(base_dir, app_name);
std::fs::create_dir_all(&dir)?;
let path = dir.join(format!("counters-{file_name}"));
Expand All @@ -116,7 +105,24 @@ pub fn map_counters(
return Err(io::Error::last_os_error());
}

target.store(ptr.cast::<AtomicU64>(), Ordering::Release);
Ok(ptr.cast::<AtomicU64>())
}

/// As `mmap_counters_file` but publishes the result into a
/// `AtomicPtr` target — used by the static-pointer pattern emitted by
/// `declare_counters!`. No-op if `target` is already non-null.
pub fn map_counters(
base_dir: &Path,
app_name: &str,
file_name: &str,
bytes: usize,
target: &AtomicPtr<AtomicU64>,
) -> io::Result<()> {
if !target.load(Ordering::Relaxed).is_null() {
return Ok(());
}
let ptr = mmap_counters_file(base_dir, app_name, file_name, bytes)?;
target.store(ptr, Ordering::Release);
Ok(())
}

Expand Down
51 changes: 50 additions & 1 deletion crates/common/src/spine/tcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

pub use consumer::{AcquiredRead, Consumer, RandomAccessConsumer, TCacheRead};
use flux::timing::Nanos;
use flux::{timing::Nanos, tracing};
pub use producer::{MultiProducer, Producer, Reservation, TCacheProducer};
use thiserror::Error;

Expand All @@ -20,8 +20,11 @@ const MAX_CONSUMERS: usize = 64;
const ALIGN: usize = size_of::<Slot>();

mod consumer;
mod metrics;
mod producer;

use metrics::TCacheMetrics;

/// Single or multi producer, multi consumer cache buffer with a Tail
///
/// _ /)---(\
Expand All @@ -39,6 +42,9 @@ pub struct TCache {
head: TCacheHead,
len: u32,
data: Box<[u8]>,
/// `None` when name is empty, or if mmap'ing the metrics file
/// fails (tile continues to function without surfer visibility).
metrics: Option<TCacheMetrics>,
}

#[derive(Copy, Clone, Debug)]
Expand Down Expand Up @@ -107,6 +113,10 @@ impl TCache {
})
.ok_or(Error::MaxConsumers)?;

// Publish a baseline tail so surfer's chart has a value to plot
// before the consumer has actually called free.
self.record_tail(index, seq);

Ok(Consumer {
cache: TCacheRef { cache: addr_of!(*self) as *const c_void },
index,
Expand All @@ -128,6 +138,8 @@ impl TCache {
})
.ok_or(Error::MaxConsumers)?;

self.record_tail(index, seq);

Ok(RandomAccessConsumer {
cache: TCacheRef { cache: addr_of!(*self) as *const c_void },
index,
Expand Down Expand Up @@ -286,7 +298,13 @@ impl TCache {
if success {
slot.skip = 0;
}
let new_head = seq + slot.reservation_len as u64;
slot.seq = AtomicU64::new(seq);
// Track the producer's actual progress for the metrics layer.
// `head.seq` (visible to joining consumers) is only updated by
// `publish_head` on out-of-space, but surfer wants the live
// production cursor.
self.record_head(new_head);
}

fn alloc_tache(name: &'static str, size: usize) -> Box<Self> {
Expand All @@ -295,6 +313,15 @@ impl TCache {
"n is not mutiple of {ALIGN}"
);
let layout = Layout::from_size_align(size, ALIGN).unwrap();
let metrics = if name.is_empty() {
None
} else {
// MAX_CONSUMERS — overcount is fine for the file size; surfer
// ignores never-set tails (they stay at u64::MAX sentinel).
TCacheMetrics::new(name, MAX_CONSUMERS, size as u64)
.map_err(|e| tracing::warn!(?name, ?e, "TCacheMetrics::new failed"))
.ok()
};
unsafe {
let ptr = alloc::alloc_zeroed(layout);
if ptr.is_null() {
Expand All @@ -309,9 +336,31 @@ impl TCache {
},
len: data.len() as u32,
data,
metrics,
})
}
}

#[inline]
pub(super) fn record_head(&self, seq: u64) {
if let Some(m) = &self.metrics {
m.set_head_seq(seq);
}
}

/// Record a consumer's tail seq into the per-tcache metrics slot.
/// A seq of 0 is interpreted as "no real progress" and is mapped
/// to the `u64::MAX` sentinel — surfer then treats it as
/// "tail == head" rather than dragging `min_tail` to 0. The
/// in-memory `head.tails[idx]` is unaffected; this is purely the
/// metrics view.
#[inline]
pub(super) fn record_tail(&self, idx: usize, seq: u64) {
if let Some(m) = &self.metrics {
let v = if seq == 0 { u64::MAX } else { seq };
m.set_tail_seq(idx, v);
}
}
}

#[repr(C)]
Expand Down
58 changes: 33 additions & 25 deletions crates/common/src/spine/tcache/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl Consumer {
//tracing::warn!("consumer free: {}", self.seq);
self.seq = self.next_seq;
self.cache.head.tails[self.index].store(self.seq, Ordering::Release);
self.cache.record_tail(self.index, self.seq);
}
}

Expand All @@ -103,7 +104,12 @@ impl RandomAccessConsumer {
/// visible to the Producer.
pub fn free(&self) {
let tail = self.active.tail_seq;
self.cache.head.tails[self.index].store(tail, Ordering::Release);

if tail != u64::MAX {
//tracing::warn!("Random consumer free {tail}");
self.cache.head.tails[self.index].store(tail, Ordering::Release);
self.cache.record_tail(self.index, tail);
}
}

fn release(&mut self, seq: u64) {
Expand Down Expand Up @@ -168,6 +174,7 @@ pub(super) struct Buckets {
head_seq: u64,
bucket_size: u64,
bucket_shift: u64,
bucket_mask: u64,
// max difference between head and tail, before 'forced' eviction
lag_threshold: u64,
}
Expand All @@ -188,56 +195,55 @@ impl Buckets {
head_seq: 0,
bucket_size,
bucket_shift: bucket_size.trailing_zeros() as u64,
bucket_mask: !(bucket_size - 1),
lag_threshold: (LAG_PERCENT * (cache_capacity as f64)) as u64,
}
}

fn acquire(&mut self, seq: u64) {
let bucket_idx = self.index(seq);
let bucket_idx = self.bucket_index(seq);
self.buckets[bucket_idx] += 1;

self.head_seq = self.head_seq.max(seq);

if self.tail_seq == u64::MAX {
self.tail_seq = seq & !(self.bucket_size - 1);
self.tail_seq = self.bucket_start_seq(seq);
}

// check lagging
if self.head_seq - self.tail_seq > self.lag_threshold {
let mut tail_idx = self.index(self.tail_seq);
self.buckets[tail_idx] = 0;

while self.buckets[tail_idx] == 0 && (self.tail_seq + self.bucket_size) < self.head_seq
{
tail_idx = (tail_idx + 1) & (self.buckets.len() - 1);
self.tail_seq += self.bucket_size;
// rollup tail for completed buckets
let head_bucket_seq = self.bucket_start_seq(seq);
while head_bucket_seq > self.tail_seq {
let tail_bucket = self.bucket_index(self.tail_seq);
if self.head_seq - self.tail_seq > self.lag_threshold {
tracing::warn!("unfreed lagging consumers dropped!");
self.buckets[tail_bucket] = 0;
}
if self.buckets[tail_bucket] != 0 {
break;
}
self.tail_seq += self.bucket_size;
}
}

fn release(&mut self, seq: u64) {
if seq < self.tail_seq {
tracing::warn!("tried to release: {seq} which is < {}", self.tail_seq);
return;
}

let mut bucket_idx = self.index(seq);
let bucket_idx = self.bucket_index(seq);
self.buckets[bucket_idx] = self.buckets[bucket_idx].saturating_sub(1);

let mut bucket_tail_seq = seq & !(self.bucket_size - 1);
if bucket_tail_seq == self.tail_seq {
while self.buckets[bucket_idx] == 0 &&
(bucket_tail_seq + self.bucket_size) < self.head_seq
{
bucket_idx = (bucket_idx + 1) & (self.buckets.len() - 1);
bucket_tail_seq += self.bucket_size;
}
self.tail_seq = bucket_tail_seq;
}
}

fn index(&self, seq: u64) -> usize {
#[inline]
fn bucket_index(&self, seq: u64) -> usize {
((seq >> self.bucket_shift) as usize) & (self.buckets.len() - 1)
}

#[inline]
fn bucket_start_seq(&self, seq: u64) -> u64 {
seq & self.bucket_mask
}
}

#[cfg(test)]
Expand Down Expand Up @@ -268,6 +274,7 @@ mod tests {
b.acquire(200); // bucket 3, head = 200
assert_eq!(b.tail_seq, 0);
b.release(0); // bucket 0 empties; head far enough ahead to advance
b.acquire(300);
assert!(b.tail_seq > 0, "tail did not advance: {}", b.tail_seq);
assert!(b.tail_seq <= 200);
}
Expand All @@ -283,6 +290,7 @@ mod tests {
assert_eq!(b.tail_seq, 0, "tail moved while bucket 0 still held");
// Release the head; tail jumps past bucket 0 and bucket 1 (both empty)
b.release(0);
b.acquire(350);
assert!(b.tail_seq >= 128, "tail did not jump: {}", b.tail_seq);
}

Expand Down
Loading
Loading