Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 7 additions & 17 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use crate::runtime::threads::{
LocalThreadAccessError, LocalThreadState, SharedThreadInfo, ShortThreadId,
};
use crate::runtime::threads::{LocalThreadAccessError, SharedThreadInfo, ShortThreadId};
use arbitrary_int::prelude::*;
use cfg_if::cfg_if;
use core::ffi::c_void;
Expand All @@ -13,6 +11,9 @@ use core::sync::atomic::Ordering;

mod threads;

// TODO: Eliminate this?
pub use threads::LocalThreadState;

/// An error returned by [`Brc::biased_count`],
/// either caused by being the wrong thread or not being biased at all.
///
Expand Down Expand Up @@ -78,24 +79,13 @@ impl RawBrcHeader {
///
/// # Safety
/// The resulting header must be pinned in-memory before it is ever used.
/// The thread id must be correct for this thread, or be `None`.
///
/// # Panics
/// This function will never unwind, although it may abort.
#[inline]
pub unsafe fn init() -> Self {
let this_id = match LocalThreadState::existing_short_id() {
Ok(short_id) => Some(short_id),
Err(LocalThreadAccessError::Dead | LocalThreadAccessError::IdOverflow(_)) => {
// in this case, the local state was already initialized,
// but we cannot participate in biased reference counting
None
}
Err(LocalThreadAccessError::Uninitialized) => {
// Need to actually initialize the thread state
LocalThreadState::init_tid()
}
};
match this_id {
pub unsafe fn init_with(this_thread_id: Option<ShortThreadId>) -> Self {
match this_thread_id {
None => RawBrcHeader {
shared_word: AtomicUsize::new(
SharedWord {
Expand Down
28 changes: 28 additions & 0 deletions src/runtime/threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,34 @@ impl LocalThreadState {
nounwind::abort_unwind(|| LocalThreadState::with_current(LocalThreadState::short_id).ok())
}

/// Either call [`collect`] or initialize the thread state,
/// depending on the previous thread state.
///
/// This function is a performance experiment.
#[inline]
pub fn collect_or_init_tid() -> Option<ShortThreadId> {
let (current_status, thread_id) =
THIS_THREAD_STATE_FAST.with(|state| (state.status.get(), state.short_id.get()));
match current_status {
LocalThreadStatus::DeadOrDying => None,
LocalThreadStatus::Uninit => Self::collect_or_init_tid_slow(),
LocalThreadStatus::Active if Self::currently_needs_collect() => {
Self::collect_or_init_tid_slow()
}
LocalThreadStatus::Active => thread_id,
}
}
#[inline(never)]
#[cold]
fn collect_or_init_tid_slow() -> Option<ShortThreadId> {
if Self::currently_needs_collect() {
crate::collect();
THIS_THREAD_STATE_FAST.with(|state| state.short_id.get())
} else {
Self::init_tid()
}
}

/// Access the current thread info inside the specified closure.
///
/// # Safety
Expand Down
19 changes: 9 additions & 10 deletions src/strong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,7 @@ impl<T, A: Allocator> Brc<T, A> {
/// using a particular allocator.
#[inline]
pub fn new_in(value: T, alloc: A) -> Self {
// There is no advantage to waiting for BrcRawHeader::init to initialize the thread state.
// This is because if the thread state is uninitialized,
// the queue is empty and collection would be a no-op anyway.
#[cfg(not(biasedrc_no_implicit_collect))]
collect();
let thread_id = runtime::LocalThreadState::collect_or_init_tid();
// This function used to be implemented as Self::new_with(|| value).
// While correct, this caused code bloat and was noticeably slower than Arc::new.
//
Expand All @@ -125,7 +121,8 @@ impl<T, A: Allocator> Brc<T, A> {
// If we are wrong, we just leak the newly allocated memory
let header = BrcHeader {
// SAFETY: Pinned in memory immediately after construction.
strong: unsafe { RawBrcHeader::init() },
// Thread id is correct for this thread.
strong: unsafe { RawBrcHeader::init_with(thread_id) },
weak_count: AtomicU32::new(1),
alloc: ManuallyDrop::new(alloc),
};
Expand Down Expand Up @@ -299,8 +296,7 @@ impl<T: ?Sized + SupportedPointee, A: Allocator> Brc<T, A> {
func: impl FnOnce(*mut T),
alloc: A,
) -> Self {
#[cfg(not(biasedrc_no_implicit_collect))]
collect();
let this_thread_id = runtime::LocalThreadState::collect_or_init_tid();
let layout = LayoutInfo::<A>::new_or_panic(layout);
struct CleanupGuard<A: Allocator> {
ptr: NonNull<u8>,
Expand Down Expand Up @@ -330,9 +326,12 @@ impl<T: ?Sized + SupportedPointee, A: Allocator> Brc<T, A> {
}
// SAFETY: Memory is newly allocated so it is known to be valid
// The RawBrcHeader is pinned immediately after it is created
// we just verified above that the field offset is zero
// and we just verified above that the field offset is zero.
// We also know that `this_thread_id` is valid
unsafe {
allocated.cast::<RawBrcHeader>().write(RawBrcHeader::init());
allocated
.cast::<RawBrcHeader>()
.write(RawBrcHeader::init_with(this_thread_id));
}
// SAFETY: Newly allocated memory is valid
unsafe {
Expand Down