diff --git a/litebox_broker_host/src/lib.rs b/litebox_broker_host/src/lib.rs index 20fd3cb3d..c4f311c2e 100644 --- a/litebox_broker_host/src/lib.rs +++ b/litebox_broker_host/src/lib.rs @@ -3,9 +3,9 @@ //! Channel-neutral broker-side protocol/core adapter. //! -//! This crate wires `litebox_broker_core` to any implementation of the neutral -//! host-side control-channel trait. Concrete channels live in separate crates such as -//! `litebox_broker_transport`. +//! This crate wires `litebox_broker_core` to implementations of the neutral +//! host-side control and notification channel traits. Concrete channels live in +//! separate crates such as `litebox_broker_transport`. #![no_std] @@ -13,13 +13,16 @@ extern crate std; use litebox_broker_core::{BrokerCore, BrokerSession, CallerCredential}; -use litebox_broker_protocol::BROKER_PROTOCOL_VERSION; -use litebox_broker_protocol::channel::{HostControlChannel, HostReceive, PeerCredential}; +use litebox_broker_protocol::channel::{ + HostControlChannel, HostNotificationChannel, HostReceive, PeerCredential, +}; use litebox_broker_protocol::error::ErrorCode; use litebox_broker_protocol::event::{AddEventResponse, CreateEventResponse, WaitEventResponse}; use litebox_broker_protocol::message::{ - BrokerHandshakeResponse, BrokerRequest, BrokerResponse, EventRequest, EventResponse, + BrokerHandshakeResponse, BrokerNotification, BrokerRequest, BrokerResponse, + EventReadinessNotification, EventRequest, EventResponse, }; +use litebox_broker_protocol::{BROKER_PROTOCOL_VERSION, ObjectHandle}; mod error; @@ -30,6 +33,41 @@ pub fn serve_connection( core: &BrokerCore, channel: &mut Channel, ) -> Result +where + Channel: HostControlChannel, +{ + let session = create_session(core, channel)?; + if let Some(termination) = negotiate_protocol(channel)? { + return Ok(termination); + } + serve_request_loop(channel, &session) +} + +/// Authenticates, negotiates, and serves one broker connection with notifications. +/// +/// Control requests and responses remain strictly paired on the control channel. +/// Readiness notifications are sent over the separate notification channel after +/// state-changing event requests complete. +pub fn serve_connection_with_notifications( + core: &BrokerCore, + control_channel: &mut ControlChannel, + notification_channel: &mut NotificationChannel, +) -> Result +where + ControlChannel: HostControlChannel, + NotificationChannel: HostNotificationChannel, +{ + let session = create_session(core, control_channel)?; + if let Some(termination) = negotiate_protocol(control_channel)? { + return Ok(termination); + } + serve_request_loop_with_notifications(control_channel, &session, notification_channel) +} + +fn create_session( + core: &BrokerCore, + channel: &Channel, +) -> Result where Channel: HostControlChannel, { @@ -40,8 +78,15 @@ where PeerCredential::Unauthenticated => CallerCredential::Unauthenticated, _ => return Err(BrokerHostError::Broker(ErrorCode::PolicyDenied)), }; - let session = core.create_session(caller_credential)?; + Ok(core.create_session(caller_credential)?) +} +fn negotiate_protocol( + channel: &mut Channel, +) -> Result, Channel::Error> +where + Channel: HostControlChannel, +{ loop { let request = match channel .recv_handshake_request() @@ -54,9 +99,9 @@ where ErrorCode::ProtocolState, )) .map_err(BrokerHostError::Channel)?; - return Ok(ConnectionTermination::ProtocolViolation); + return Ok(Some(ConnectionTermination::ProtocolViolation)); } - HostReceive::PeerClosed => return Ok(ConnectionTermination::PeerClosed), + HostReceive::PeerClosed => return Ok(Some(ConnectionTermination::PeerClosed)), }; let negotiated = request.protocol_version == BROKER_PROTOCOL_VERSION; @@ -77,7 +122,7 @@ where } } - serve_request_loop(channel, &session) + Ok(None) } fn serve_request_loop( @@ -108,6 +153,45 @@ where Ok(ConnectionTermination::PeerClosed) } +fn serve_request_loop_with_notifications( + control_channel: &mut ControlChannel, + session: &BrokerSession, + notification_channel: &mut NotificationChannel, +) -> Result +where + ControlChannel: HostControlChannel, + NotificationChannel: HostNotificationChannel, +{ + loop { + let request = match control_channel + .recv_request() + .map_err(BrokerHostError::Channel)? + { + HostReceive::Message(request) => request, + HostReceive::ProtocolViolation => { + control_channel + .send_response(&BrokerResponse::Error(ErrorCode::ProtocolState)) + .map_err(BrokerHostError::Channel)?; + return Ok(ConnectionTermination::ProtocolViolation); + } + HostReceive::PeerClosed => break, + }; + + let readiness_handle = event_readiness_notification_handle(&request); + let response = handle_request(session, request); + control_channel + .send_response(&response) + .map_err(BrokerHostError::Channel)?; + if let Some(notification) = event_readiness_notification(readiness_handle, &response) { + notification_channel + .send_notification(¬ification) + .map_err(BrokerHostError::Channel)?; + } + } + + Ok(ConnectionTermination::PeerClosed) +} + fn handle_request(session: &BrokerSession, request: BrokerRequest) -> BrokerResponse { match request { BrokerRequest::CloseObject(handle) => match session.close_object_reference(handle) { @@ -153,6 +237,35 @@ fn handle_event_request(session: &BrokerSession, request: EventRequest) -> Broke } } +fn event_readiness_notification_handle(request: &BrokerRequest) -> Option { + match request { + BrokerRequest::Event(EventRequest::Add(request)) => Some(request.handle), + BrokerRequest::Event(EventRequest::Consume(request)) => Some(request.handle), + _ => None, + } +} + +fn event_readiness_notification( + handle: Option, + response: &BrokerResponse, +) -> Option { + match (handle, response) { + (Some(handle), BrokerResponse::Event(EventResponse::Add(response))) => Some( + BrokerNotification::EventReadiness(EventReadinessNotification { + handle, + readiness: response.readiness, + }), + ), + (Some(handle), BrokerResponse::Event(EventResponse::Consume(response))) => Some( + BrokerNotification::EventReadiness(EventReadinessNotification { + handle, + readiness: response.readiness, + }), + ), + _ => None, + } +} + /// Terminal outcome after processing one broker connection. #[derive(Clone, Copy, Debug, PartialEq, Eq)] #[non_exhaustive] diff --git a/litebox_broker_protocol/src/channel.rs b/litebox_broker_protocol/src/channel.rs index 728053ca0..06a71b46d 100644 --- a/litebox_broker_protocol/src/channel.rs +++ b/litebox_broker_protocol/src/channel.rs @@ -2,7 +2,8 @@ // Licensed under the MIT license. use crate::message::{ - BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerRequest, BrokerResponse, + BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerNotification, BrokerRequest, + BrokerResponse, }; /// Peer identity information supplied by the channel or host layer. @@ -85,3 +86,32 @@ pub trait HostControlChannel { /// Sends one active broker response. fn send_response(&mut self, response: &BrokerResponse) -> Result<(), Self::Error>; } + +/// Local-side receive channel for broker-initiated asynchronous notifications. +/// +/// A notification channel is separate from the control channel so active broker +/// requests remain strictly paired with their responses. The deployment is +/// responsible for binding this channel to the same authenticated broker +/// association as the matching control channel. +pub trait LocalNotificationChannel { + /// Channel-specific error type. + type Error; + + /// Receives one broker notification. + /// + /// Returns `Ok(None)` when the broker closed the channel cleanly before + /// starting another notification frame. + fn recv_notification(&mut self) -> Result, Self::Error>; +} + +/// Host-side send channel for broker-initiated asynchronous notifications. +/// +/// Implementations carry notification frames only; object operation responses +/// continue to use [`HostControlChannel::send_response`]. +pub trait HostNotificationChannel { + /// Channel-specific error type. + type Error; + + /// Sends one broker notification. + fn send_notification(&mut self, notification: &BrokerNotification) -> Result<(), Self::Error>; +} diff --git a/litebox_broker_protocol/src/message.rs b/litebox_broker_protocol/src/message.rs index 36c866637..bb51f177e 100644 --- a/litebox_broker_protocol/src/message.rs +++ b/litebox_broker_protocol/src/message.rs @@ -4,7 +4,7 @@ use crate::error::ErrorCode; use crate::event::{ AddEventRequest, AddEventResponse, ConsumeEventRequest, ConsumeEventResponse, - CreateEventRequest, CreateEventResponse, WaitEventRequest, WaitEventResponse, + CreateEventRequest, CreateEventResponse, ReadinessState, WaitEventRequest, WaitEventResponse, }; use crate::{ObjectHandle, ProtocolVersion}; @@ -84,3 +84,23 @@ pub enum EventResponse { /// Consume operation response. Consume(ConsumeEventResponse), } + +/// Broker-initiated asynchronous notification. +/// +/// Notifications are level-triggered snapshots and may be coalesced or +/// duplicated by a transport. Local waiters must treat them as wakeups to +/// re-check authoritative state, not as ordered state transitions. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum BrokerNotification { + /// Readiness changed or should be re-checked for a broker-owned event object. + EventReadiness(EventReadinessNotification), +} + +/// Readiness notification for a broker-owned event object. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct EventReadinessNotification { + /// Event object handle. + pub handle: ObjectHandle, + /// Current broker-authoritative readiness snapshot. + pub readiness: ReadinessState, +} diff --git a/litebox_broker_protocol/src/wire.rs b/litebox_broker_protocol/src/wire.rs index 4e2ac0644..c9bb49a85 100644 --- a/litebox_broker_protocol/src/wire.rs +++ b/litebox_broker_protocol/src/wire.rs @@ -20,7 +20,8 @@ use thiserror::Error; use crate::error::ErrorCode; use crate::message::{ - BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerRequest, BrokerResponse, + BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerNotification, BrokerRequest, + BrokerResponse, EventReadinessNotification, }; use primitive::{Decoder, Encoder}; @@ -38,6 +39,8 @@ const RESPONSE_TAG_ERROR: u8 = 2; const RESPONSE_TAG_VERSION_MISMATCH: u8 = 3; const RESPONSE_TAG_OBJECT_CLOSED: u8 = 4; +const NOTIFICATION_TAG_EVENT_READINESS: u8 = 0; + /// Error produced while encoding or decoding a broker wire message. #[derive(Clone, Copy, Debug, Error, PartialEq, Eq)] #[non_exhaustive] @@ -208,6 +211,39 @@ pub fn decode_response(frame: &[u8]) -> Result { Ok(response) } +/// Encodes a broker notification body. +/// +/// Successful encodings are always non-empty because the first byte is the +/// message tag. +pub fn encode_notification(notification: BrokerNotification) -> Vec { + let mut encoder = Encoder::default(); + match notification { + BrokerNotification::EventReadiness(notification) => { + encoder.u8(NOTIFICATION_TAG_EVENT_READINESS); + encoder.handle(notification.handle); + event::encode_readiness(&mut encoder, notification.readiness); + } + } + encoder.finish() +} + +/// Decodes a broker notification body. +pub fn decode_notification(frame: &[u8]) -> Result { + let mut decoder = Decoder::new(frame); + let tag = decoder.u8()?; + let notification = match tag { + NOTIFICATION_TAG_EVENT_READINESS => { + BrokerNotification::EventReadiness(EventReadinessNotification { + handle: decoder.handle()?, + readiness: event::decode_readiness(&mut decoder)?, + }) + } + _ => return Err(WireError::InvalidTag), + }; + decoder.finish()?; + Ok(notification) +} + #[cfg(test)] mod tests { use super::*; @@ -329,6 +365,27 @@ mod tests { } } + #[test] + fn notification_codec_round_trips_all_variants() { + let handle = ObjectHandle(13); + let notifications = [BrokerNotification::EventReadiness( + EventReadinessNotification { + handle, + readiness: ReadinessState { + read_ready: true, + write_ready: false, + }, + }, + )]; + + for notification in notifications { + assert_eq!( + decode_notification(&encode_notification(notification.clone())).unwrap(), + notification + ); + } + } + #[test] fn decode_rejects_malformed_handshake_request_frames() { assert_eq!( @@ -461,6 +518,48 @@ mod tests { assert_eq!(decode_response(&frame), Err(WireError::TrailingBytes)); } + #[test] + fn decode_rejects_malformed_notification_frames() { + assert_eq!( + decode_notification(&[0xff, 1, 2, 3]), + Err(WireError::InvalidTag) + ); + assert_eq!( + decode_notification(&[NOTIFICATION_TAG_EVENT_READINESS]), + Err(WireError::TruncatedFrame) + ); + + let mut invalid_bool = encode_notification(BrokerNotification::EventReadiness( + EventReadinessNotification { + handle: ObjectHandle(13), + readiness: ReadinessState { + read_ready: true, + write_ready: false, + }, + }, + )); + *invalid_bool.last_mut().unwrap() = 0xff; + assert_eq!( + decode_notification(&invalid_bool), + Err(WireError::InvalidBoolean) + ); + + let mut trailing = encode_notification(BrokerNotification::EventReadiness( + EventReadinessNotification { + handle: ObjectHandle(13), + readiness: ReadinessState { + read_ready: true, + write_ready: false, + }, + }, + )); + trailing.push(0xff); + assert_eq!( + decode_notification(&trailing), + Err(WireError::TrailingBytes) + ); + } + #[test] fn event_add_response_wire_shape_is_pinned() { assert_eq!( @@ -475,4 +574,20 @@ mod tests { [1, 2, 1, 0] ); } + + #[test] + fn event_readiness_notification_wire_shape_is_pinned() { + assert_eq!( + encode_notification(BrokerNotification::EventReadiness( + EventReadinessNotification { + handle: ObjectHandle(13), + readiness: ReadinessState { + read_ready: true, + write_ready: false, + }, + } + )), + [0, 13, 0, 0, 0, 0, 0, 0, 0, 1, 0] + ); + } } diff --git a/litebox_broker_protocol/src/wire/event.rs b/litebox_broker_protocol/src/wire/event.rs index 7892f94cd..0810b03b7 100644 --- a/litebox_broker_protocol/src/wire/event.rs +++ b/litebox_broker_protocol/src/wire/event.rs @@ -114,12 +114,12 @@ pub(super) fn decode_event_response(decoder: &mut Decoder<'_>) -> Result) -> Result { +pub(super) fn decode_readiness(decoder: &mut Decoder<'_>) -> Result { Ok(ReadinessState { read_ready: decoder.bool()?, write_ready: decoder.bool()?, diff --git a/litebox_broker_transport/src/unix_socket.rs b/litebox_broker_transport/src/unix_socket.rs index 877507d5c..d02fe784d 100644 --- a/litebox_broker_transport/src/unix_socket.rs +++ b/litebox_broker_transport/src/unix_socket.rs @@ -13,15 +13,17 @@ use std::path::Path; use std::time::{Duration, Instant}; use litebox_broker_protocol::channel::{ - HostControlChannel, HostReceive, LocalControlChannel, PeerCredential, + HostControlChannel, HostNotificationChannel, HostReceive, LocalControlChannel, + LocalNotificationChannel, PeerCredential, }; use litebox_broker_protocol::message::{ - BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerRequest, BrokerResponse, + BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerNotification, BrokerRequest, + BrokerResponse, }; use litebox_broker_protocol::wire::{ - WireError, decode_handshake_request, decode_handshake_response, decode_request, - decode_response, encode_handshake_request, encode_handshake_response, encode_request, - encode_response, + WireError, decode_handshake_request, decode_handshake_response, decode_notification, + decode_request, decode_response, encode_handshake_request, encode_handshake_response, + encode_notification, encode_request, encode_response, }; const MAX_FRAME_LEN: usize = 64 * 1024; @@ -67,6 +69,16 @@ pub struct UnixStreamHostControlChannel { stream: UnixStream, } +/// Local-side Unix-domain-socket notification channel for the hosted userland POC. +pub struct UnixStreamLocalNotificationChannel { + stream: UnixStream, +} + +/// Host-side Unix-domain-socket notification channel for the hosted userland POC. +pub struct UnixStreamHostNotificationChannel { + stream: UnixStream, +} + impl UnixStreamHostControlChannel { /// Creates a host control channel from an accepted Unix stream. pub const fn from_accepted(stream: UnixStream) -> Self { @@ -74,6 +86,25 @@ impl UnixStreamHostControlChannel { } } +impl UnixStreamLocalNotificationChannel { + /// Creates a local notification channel from an already-connected Unix stream. + pub const fn from_connected(stream: UnixStream) -> Self { + Self { stream } + } + + /// Connects to a userland broker Unix notification socket. + pub fn connect(path: impl AsRef) -> IoResult { + UnixStream::connect(path).map(Self::from_connected) + } +} + +impl UnixStreamHostNotificationChannel { + /// Creates a host notification channel from an accepted Unix stream. + pub const fn from_accepted(stream: UnixStream) -> Self { + Self { stream } + } +} + impl LocalControlChannel for UnixStreamLocalControlChannel { type Error = Error; @@ -153,6 +184,29 @@ impl HostControlChannel for UnixStreamHostControlChannel { } } +impl LocalNotificationChannel for UnixStreamLocalNotificationChannel { + type Error = Error; + + fn recv_notification(&mut self) -> IoResult> { + match read_frame_with_deadline(&mut self.stream, None)? { + Some(frame) => decode_notification(&frame).map(Some).map_err(wire_error), + None => Ok(None), + } + } +} + +impl HostNotificationChannel for UnixStreamHostNotificationChannel { + type Error = Error; + + fn send_notification(&mut self, notification: &BrokerNotification) -> IoResult<()> { + write_frame_with_deadline( + &mut self.stream, + &encode_notification(notification.clone()), + None, + ) + } +} + fn read_frame_with_deadline( stream: &mut UnixStream, deadline: Option, @@ -383,4 +437,24 @@ mod tests { HostReceive::ProtocolViolation ); } + + #[test] + fn notification_frame_round_trip() { + let (local_stream, host_stream) = UnixStream::pair().unwrap(); + let mut local = UnixStreamLocalNotificationChannel::from_connected(local_stream); + let mut host = UnixStreamHostNotificationChannel::from_accepted(host_stream); + let notification = BrokerNotification::EventReadiness( + litebox_broker_protocol::message::EventReadinessNotification { + handle: litebox_broker_protocol::ObjectHandle(7), + readiness: litebox_broker_protocol::event::ReadinessState { + read_ready: true, + write_ready: false, + }, + }, + ); + + host.send_notification(¬ification).unwrap(); + + assert_eq!(local.recv_notification().unwrap(), Some(notification)); + } } diff --git a/litebox_broker_userland/tests/notification_channel.rs b/litebox_broker_userland/tests/notification_channel.rs new file mode 100644 index 000000000..6c4104d68 --- /dev/null +++ b/litebox_broker_userland/tests/notification_channel.rs @@ -0,0 +1,182 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +use std::io::{Error, ErrorKind, Result}; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; + +use litebox_broker_core::{BrokerCore, PolicyEngine, PrincipalRights}; +use litebox_broker_host::{ConnectionTermination, serve_connection_with_notifications}; +use litebox_broker_local::BrokerLocal; +use litebox_broker_protocol::channel::LocalNotificationChannel; +use litebox_broker_protocol::message::{BrokerNotification, EventReadinessNotification}; +use litebox_broker_transport::unix_socket::{ + UnixStreamHostControlChannel, UnixStreamHostNotificationChannel, UnixStreamLocalControlChannel, + UnixStreamLocalNotificationChannel, +}; + +const TEST_TIMEOUT: Duration = Duration::from_secs(5); + +#[test] +fn broker_sends_event_readiness_notification_over_userland_channel() { + let control_socket_path = unique_socket_path("control"); + let notification_socket_path = unique_socket_path("notification"); + let broker = spawn_test_broker( + control_socket_path.clone(), + notification_socket_path.clone(), + ); + + let mut notifications = connect_notification_with_retry(¬ification_socket_path).unwrap(); + let control_channel = connect_control_with_retry(&control_socket_path).unwrap(); + let mut local = BrokerLocal::negotiate(control_channel).unwrap(); + + let handle = local.create_event_with_count(0).unwrap(); + let readiness = local.add_event(handle, 1).unwrap(); + assert_eq!( + notifications.recv_notification().unwrap(), + Some(BrokerNotification::EventReadiness( + EventReadinessNotification { handle, readiness } + )) + ); + + drop(local); + broker.join(); +} + +fn spawn_test_broker( + control_socket_path: PathBuf, + notification_socket_path: PathBuf, +) -> TestBroker { + let _ = std::fs::remove_file(&control_socket_path); + let _ = std::fs::remove_file(¬ification_socket_path); + let (ready_tx, ready_rx) = std::sync::mpsc::channel(); + let thread_control_socket_path = control_socket_path.clone(); + let thread_notification_socket_path = notification_socket_path.clone(); + let thread = std::thread::spawn(move || { + let control_listener = UnixListener::bind(&thread_control_socket_path).unwrap(); + let notification_listener = UnixListener::bind(&thread_notification_socket_path).unwrap(); + control_listener.set_nonblocking(true).unwrap(); + notification_listener.set_nonblocking(true).unwrap(); + ready_tx.send(()).unwrap(); + + let notification_stream = accept_with_retry(¬ification_listener).unwrap(); + let control_stream = accept_with_retry(&control_listener).unwrap(); + let broker = BrokerCore::new(PolicyEngine::with_unauthenticated_rights( + PrincipalRights::all(), + )) + .unwrap(); + let mut control_channel = UnixStreamHostControlChannel::from_accepted(control_stream); + let mut notification_channel = + UnixStreamHostNotificationChannel::from_accepted(notification_stream); + + assert_eq!( + serve_connection_with_notifications( + &broker, + &mut control_channel, + &mut notification_channel, + ) + .unwrap(), + ConnectionTermination::PeerClosed + ); + }); + + ready_rx + .recv_timeout(TEST_TIMEOUT) + .expect("broker test host did not start"); + TestBroker { + thread: Some(thread), + control_socket_path, + notification_socket_path, + } +} + +struct TestBroker { + thread: Option>, + control_socket_path: PathBuf, + notification_socket_path: PathBuf, +} + +impl TestBroker { + fn join(mut self) { + self.thread + .take() + .expect("broker test host thread missing") + .join() + .expect("broker test host panicked"); + } + + fn cleanup(&self) { + let _ = std::fs::remove_file(&self.control_socket_path); + let _ = std::fs::remove_file(&self.notification_socket_path); + } +} + +impl Drop for TestBroker { + fn drop(&mut self) { + self.cleanup(); + } +} + +fn connect_control_with_retry(socket_path: &Path) -> Result { + let deadline = Instant::now() + TEST_TIMEOUT; + loop { + match UnixStreamLocalControlChannel::connect_with_setup_deadline(socket_path, deadline) { + Ok(channel) => return Ok(channel), + Err(error) if retry_socket_connect(&error, deadline) => { + std::thread::sleep(Duration::from_millis(10)); + } + Err(error) => return Err(error), + } + } +} + +fn connect_notification_with_retry( + socket_path: &Path, +) -> Result { + let deadline = Instant::now() + TEST_TIMEOUT; + loop { + match UnixStream::connect(socket_path) { + Ok(stream) => { + stream.set_read_timeout(Some(TEST_TIMEOUT))?; + return Ok(UnixStreamLocalNotificationChannel::from_connected(stream)); + } + Err(error) if retry_socket_connect(&error, deadline) => { + std::thread::sleep(Duration::from_millis(10)); + } + Err(error) => return Err(error), + } + } +} + +fn retry_socket_connect(error: &Error, deadline: Instant) -> bool { + Instant::now() < deadline + && matches!( + error.kind(), + ErrorKind::NotFound | ErrorKind::ConnectionRefused + ) +} + +fn accept_with_retry(listener: &UnixListener) -> Result { + let deadline = Instant::now() + TEST_TIMEOUT; + loop { + match listener.accept() { + Ok((stream, _)) => return Ok(stream), + Err(error) if error.kind() == ErrorKind::WouldBlock && Instant::now() < deadline => { + std::thread::sleep(Duration::from_millis(10)); + } + Err(error) => return Err(error), + } + } +} + +fn unique_socket_path(name: &str) -> PathBuf { + let nonce = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + std::env::temp_dir().join(format!( + "litebox-broker-userland-notification-channel-{name}-{}-{nonce}.sock", + std::process::id() + )) +}