From 0ddf5a88fa4b2277b533450abebfebbc8fb484ae Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Fri, 26 Jun 2026 12:35:27 -0700 Subject: [PATCH 1/6] Add broker readiness notification channel skeleton Define broker-to-local notification messages and channel traits, including event readiness snapshots, and add Unix-socket framing support for notification traffic. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- litebox_broker_protocol/src/channel.rs | 32 +++++- litebox_broker_protocol/src/message.rs | 22 +++- litebox_broker_protocol/src/wire.rs | 117 +++++++++++++++++++- litebox_broker_protocol/src/wire/event.rs | 4 +- litebox_broker_transport/src/unix_socket.rs | 84 +++++++++++++- 5 files changed, 249 insertions(+), 10 deletions(-) diff --git a/litebox_broker_protocol/src/channel.rs b/litebox_broker_protocol/src/channel.rs index 728053ca0..06a71b46d 100644 --- a/litebox_broker_protocol/src/channel.rs +++ b/litebox_broker_protocol/src/channel.rs @@ -2,7 +2,8 @@ // Licensed under the MIT license. use crate::message::{ - BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerRequest, BrokerResponse, + BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerNotification, BrokerRequest, + BrokerResponse, }; /// Peer identity information supplied by the channel or host layer. @@ -85,3 +86,32 @@ pub trait HostControlChannel { /// Sends one active broker response. fn send_response(&mut self, response: &BrokerResponse) -> Result<(), Self::Error>; } + +/// Local-side receive channel for broker-initiated asynchronous notifications. +/// +/// A notification channel is separate from the control channel so active broker +/// requests remain strictly paired with their responses. The deployment is +/// responsible for binding this channel to the same authenticated broker +/// association as the matching control channel. +pub trait LocalNotificationChannel { + /// Channel-specific error type. + type Error; + + /// Receives one broker notification. + /// + /// Returns `Ok(None)` when the broker closed the channel cleanly before + /// starting another notification frame. + fn recv_notification(&mut self) -> Result, Self::Error>; +} + +/// Host-side send channel for broker-initiated asynchronous notifications. +/// +/// Implementations carry notification frames only; object operation responses +/// continue to use [`HostControlChannel::send_response`]. +pub trait HostNotificationChannel { + /// Channel-specific error type. + type Error; + + /// Sends one broker notification. + fn send_notification(&mut self, notification: &BrokerNotification) -> Result<(), Self::Error>; +} diff --git a/litebox_broker_protocol/src/message.rs b/litebox_broker_protocol/src/message.rs index 36c866637..bb51f177e 100644 --- a/litebox_broker_protocol/src/message.rs +++ b/litebox_broker_protocol/src/message.rs @@ -4,7 +4,7 @@ use crate::error::ErrorCode; use crate::event::{ AddEventRequest, AddEventResponse, ConsumeEventRequest, ConsumeEventResponse, - CreateEventRequest, CreateEventResponse, WaitEventRequest, WaitEventResponse, + CreateEventRequest, CreateEventResponse, ReadinessState, WaitEventRequest, WaitEventResponse, }; use crate::{ObjectHandle, ProtocolVersion}; @@ -84,3 +84,23 @@ pub enum EventResponse { /// Consume operation response. Consume(ConsumeEventResponse), } + +/// Broker-initiated asynchronous notification. +/// +/// Notifications are level-triggered snapshots and may be coalesced or +/// duplicated by a transport. Local waiters must treat them as wakeups to +/// re-check authoritative state, not as ordered state transitions. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum BrokerNotification { + /// Readiness changed or should be re-checked for a broker-owned event object. + EventReadiness(EventReadinessNotification), +} + +/// Readiness notification for a broker-owned event object. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct EventReadinessNotification { + /// Event object handle. + pub handle: ObjectHandle, + /// Current broker-authoritative readiness snapshot. + pub readiness: ReadinessState, +} diff --git a/litebox_broker_protocol/src/wire.rs b/litebox_broker_protocol/src/wire.rs index 4e2ac0644..c9bb49a85 100644 --- a/litebox_broker_protocol/src/wire.rs +++ b/litebox_broker_protocol/src/wire.rs @@ -20,7 +20,8 @@ use thiserror::Error; use crate::error::ErrorCode; use crate::message::{ - BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerRequest, BrokerResponse, + BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerNotification, BrokerRequest, + BrokerResponse, EventReadinessNotification, }; use primitive::{Decoder, Encoder}; @@ -38,6 +39,8 @@ const RESPONSE_TAG_ERROR: u8 = 2; const RESPONSE_TAG_VERSION_MISMATCH: u8 = 3; const RESPONSE_TAG_OBJECT_CLOSED: u8 = 4; +const NOTIFICATION_TAG_EVENT_READINESS: u8 = 0; + /// Error produced while encoding or decoding a broker wire message. #[derive(Clone, Copy, Debug, Error, PartialEq, Eq)] #[non_exhaustive] @@ -208,6 +211,39 @@ pub fn decode_response(frame: &[u8]) -> Result { Ok(response) } +/// Encodes a broker notification body. +/// +/// Successful encodings are always non-empty because the first byte is the +/// message tag. +pub fn encode_notification(notification: BrokerNotification) -> Vec { + let mut encoder = Encoder::default(); + match notification { + BrokerNotification::EventReadiness(notification) => { + encoder.u8(NOTIFICATION_TAG_EVENT_READINESS); + encoder.handle(notification.handle); + event::encode_readiness(&mut encoder, notification.readiness); + } + } + encoder.finish() +} + +/// Decodes a broker notification body. +pub fn decode_notification(frame: &[u8]) -> Result { + let mut decoder = Decoder::new(frame); + let tag = decoder.u8()?; + let notification = match tag { + NOTIFICATION_TAG_EVENT_READINESS => { + BrokerNotification::EventReadiness(EventReadinessNotification { + handle: decoder.handle()?, + readiness: event::decode_readiness(&mut decoder)?, + }) + } + _ => return Err(WireError::InvalidTag), + }; + decoder.finish()?; + Ok(notification) +} + #[cfg(test)] mod tests { use super::*; @@ -329,6 +365,27 @@ mod tests { } } + #[test] + fn notification_codec_round_trips_all_variants() { + let handle = ObjectHandle(13); + let notifications = [BrokerNotification::EventReadiness( + EventReadinessNotification { + handle, + readiness: ReadinessState { + read_ready: true, + write_ready: false, + }, + }, + )]; + + for notification in notifications { + assert_eq!( + decode_notification(&encode_notification(notification.clone())).unwrap(), + notification + ); + } + } + #[test] fn decode_rejects_malformed_handshake_request_frames() { assert_eq!( @@ -461,6 +518,48 @@ mod tests { assert_eq!(decode_response(&frame), Err(WireError::TrailingBytes)); } + #[test] + fn decode_rejects_malformed_notification_frames() { + assert_eq!( + decode_notification(&[0xff, 1, 2, 3]), + Err(WireError::InvalidTag) + ); + assert_eq!( + decode_notification(&[NOTIFICATION_TAG_EVENT_READINESS]), + Err(WireError::TruncatedFrame) + ); + + let mut invalid_bool = encode_notification(BrokerNotification::EventReadiness( + EventReadinessNotification { + handle: ObjectHandle(13), + readiness: ReadinessState { + read_ready: true, + write_ready: false, + }, + }, + )); + *invalid_bool.last_mut().unwrap() = 0xff; + assert_eq!( + decode_notification(&invalid_bool), + Err(WireError::InvalidBoolean) + ); + + let mut trailing = encode_notification(BrokerNotification::EventReadiness( + EventReadinessNotification { + handle: ObjectHandle(13), + readiness: ReadinessState { + read_ready: true, + write_ready: false, + }, + }, + )); + trailing.push(0xff); + assert_eq!( + decode_notification(&trailing), + Err(WireError::TrailingBytes) + ); + } + #[test] fn event_add_response_wire_shape_is_pinned() { assert_eq!( @@ -475,4 +574,20 @@ mod tests { [1, 2, 1, 0] ); } + + #[test] + fn event_readiness_notification_wire_shape_is_pinned() { + assert_eq!( + encode_notification(BrokerNotification::EventReadiness( + EventReadinessNotification { + handle: ObjectHandle(13), + readiness: ReadinessState { + read_ready: true, + write_ready: false, + }, + } + )), + [0, 13, 0, 0, 0, 0, 0, 0, 0, 1, 0] + ); + } } diff --git a/litebox_broker_protocol/src/wire/event.rs b/litebox_broker_protocol/src/wire/event.rs index 7892f94cd..0810b03b7 100644 --- a/litebox_broker_protocol/src/wire/event.rs +++ b/litebox_broker_protocol/src/wire/event.rs @@ -114,12 +114,12 @@ pub(super) fn decode_event_response(decoder: &mut Decoder<'_>) -> Result) -> Result { +pub(super) fn decode_readiness(decoder: &mut Decoder<'_>) -> Result { Ok(ReadinessState { read_ready: decoder.bool()?, write_ready: decoder.bool()?, diff --git a/litebox_broker_transport/src/unix_socket.rs b/litebox_broker_transport/src/unix_socket.rs index 877507d5c..d02fe784d 100644 --- a/litebox_broker_transport/src/unix_socket.rs +++ b/litebox_broker_transport/src/unix_socket.rs @@ -13,15 +13,17 @@ use std::path::Path; use std::time::{Duration, Instant}; use litebox_broker_protocol::channel::{ - HostControlChannel, HostReceive, LocalControlChannel, PeerCredential, + HostControlChannel, HostNotificationChannel, HostReceive, LocalControlChannel, + LocalNotificationChannel, PeerCredential, }; use litebox_broker_protocol::message::{ - BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerRequest, BrokerResponse, + BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerNotification, BrokerRequest, + BrokerResponse, }; use litebox_broker_protocol::wire::{ - WireError, decode_handshake_request, decode_handshake_response, decode_request, - decode_response, encode_handshake_request, encode_handshake_response, encode_request, - encode_response, + WireError, decode_handshake_request, decode_handshake_response, decode_notification, + decode_request, decode_response, encode_handshake_request, encode_handshake_response, + encode_notification, encode_request, encode_response, }; const MAX_FRAME_LEN: usize = 64 * 1024; @@ -67,6 +69,16 @@ pub struct UnixStreamHostControlChannel { stream: UnixStream, } +/// Local-side Unix-domain-socket notification channel for the hosted userland POC. +pub struct UnixStreamLocalNotificationChannel { + stream: UnixStream, +} + +/// Host-side Unix-domain-socket notification channel for the hosted userland POC. +pub struct UnixStreamHostNotificationChannel { + stream: UnixStream, +} + impl UnixStreamHostControlChannel { /// Creates a host control channel from an accepted Unix stream. pub const fn from_accepted(stream: UnixStream) -> Self { @@ -74,6 +86,25 @@ impl UnixStreamHostControlChannel { } } +impl UnixStreamLocalNotificationChannel { + /// Creates a local notification channel from an already-connected Unix stream. + pub const fn from_connected(stream: UnixStream) -> Self { + Self { stream } + } + + /// Connects to a userland broker Unix notification socket. + pub fn connect(path: impl AsRef) -> IoResult { + UnixStream::connect(path).map(Self::from_connected) + } +} + +impl UnixStreamHostNotificationChannel { + /// Creates a host notification channel from an accepted Unix stream. + pub const fn from_accepted(stream: UnixStream) -> Self { + Self { stream } + } +} + impl LocalControlChannel for UnixStreamLocalControlChannel { type Error = Error; @@ -153,6 +184,29 @@ impl HostControlChannel for UnixStreamHostControlChannel { } } +impl LocalNotificationChannel for UnixStreamLocalNotificationChannel { + type Error = Error; + + fn recv_notification(&mut self) -> IoResult> { + match read_frame_with_deadline(&mut self.stream, None)? { + Some(frame) => decode_notification(&frame).map(Some).map_err(wire_error), + None => Ok(None), + } + } +} + +impl HostNotificationChannel for UnixStreamHostNotificationChannel { + type Error = Error; + + fn send_notification(&mut self, notification: &BrokerNotification) -> IoResult<()> { + write_frame_with_deadline( + &mut self.stream, + &encode_notification(notification.clone()), + None, + ) + } +} + fn read_frame_with_deadline( stream: &mut UnixStream, deadline: Option, @@ -383,4 +437,24 @@ mod tests { HostReceive::ProtocolViolation ); } + + #[test] + fn notification_frame_round_trip() { + let (local_stream, host_stream) = UnixStream::pair().unwrap(); + let mut local = UnixStreamLocalNotificationChannel::from_connected(local_stream); + let mut host = UnixStreamHostNotificationChannel::from_accepted(host_stream); + let notification = BrokerNotification::EventReadiness( + litebox_broker_protocol::message::EventReadinessNotification { + handle: litebox_broker_protocol::ObjectHandle(7), + readiness: litebox_broker_protocol::event::ReadinessState { + read_ready: true, + write_ready: false, + }, + }, + ); + + host.send_notification(¬ification).unwrap(); + + assert_eq!(local.recv_notification().unwrap(), Some(notification)); + } } From f017700e7d8d14f915fbeca56873c67bb782ac8a Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Fri, 26 Jun 2026 13:31:57 -0700 Subject: [PATCH 2/6] Test notification channel end to end Extend the userland broker integration test with a real Unix-socket notification path between the broker-side test helper and fake runner. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/userland_broker.rs | 98 ++++++++++++++++++- 1 file changed, 95 insertions(+), 3 deletions(-) diff --git a/litebox_broker_userland/tests/userland_broker.rs b/litebox_broker_userland/tests/userland_broker.rs index 6bc055aed..9207ab498 100644 --- a/litebox_broker_userland/tests/userland_broker.rs +++ b/litebox_broker_userland/tests/userland_broker.rs @@ -3,16 +3,24 @@ use std::ffi::{OsStr, OsString}; use std::io::{Error, ErrorKind, Result}; +use std::os::unix::net::UnixListener; use std::os::unix::process::ExitStatusExt; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::process::{Child, Command}; use std::time::{Duration, Instant}; use litebox_broker_local::BrokerLocal; +use litebox_broker_protocol::ObjectHandle; +use litebox_broker_protocol::channel::{HostNotificationChannel, LocalNotificationChannel}; use litebox_broker_protocol::event::ReadinessState; -use litebox_broker_transport::unix_socket::UnixStreamLocalControlChannel; +use litebox_broker_protocol::message::{BrokerNotification, EventReadinessNotification}; +use litebox_broker_transport::unix_socket::{ + UnixStreamHostNotificationChannel, UnixStreamLocalControlChannel, + UnixStreamLocalNotificationChannel, +}; const RUNNER_ARGUMENT: &str = "broker-userland-test-runner"; +const NOTIFICATION_SOCKET_ARGUMENT: &str = "--notification-socket"; fn main() { let args = std::env::args_os().skip(1).collect::>(); @@ -34,11 +42,15 @@ fn run_parent_test() { // After the fake runner finishes its broker requests, it terminates the broker // parent process; this lets the test exercise the long-running broker without a // test-only shutdown path. + let notification_socket_path = unique_socket_path("notification"); + let notification_sender = spawn_notification_sender(notification_socket_path.clone()); let mut broker = ChildGuard { child: Command::new(env!("CARGO_BIN_EXE_litebox-broker-userland")) .arg("--runner") .arg(std::env::current_exe().unwrap()) .arg(RUNNER_ARGUMENT) + .arg(NOTIFICATION_SOCKET_ARGUMENT) + .arg(¬ification_socket_path) .spawn() .unwrap(), }; @@ -47,10 +59,14 @@ fn run_parent_test() { while Instant::now() < deadline { if let Some(status) = broker.child.try_wait().unwrap() { assert_eq!(status.signal(), Some(libc::SIGTERM)); + notification_sender.join().unwrap(); + let _ = std::fs::remove_file(¬ification_socket_path); return; } std::thread::sleep(Duration::from_millis(10)); } + notification_sender.join().unwrap(); + let _ = std::fs::remove_file(¬ification_socket_path); panic!("timed out waiting for broker to stop"); } @@ -67,7 +83,11 @@ fn run_fake_runner(args: &[OsString]) { args.get(3).map(OsString::as_os_str), Some(OsStr::new(RUNNER_ARGUMENT)) ); - assert_eq!(args.len(), 4, "unexpected runner arguments: {args:?}"); + assert_eq!( + args.get(4).map(OsString::as_os_str), + Some(OsStr::new(NOTIFICATION_SOCKET_ARGUMENT)) + ); + assert_eq!(args.len(), 6, "unexpected runner arguments: {args:?}"); let socket_path = args.get(2).unwrap(); let channel = connect_with_retry(Path::new(socket_path)).unwrap(); @@ -99,6 +119,14 @@ fn run_fake_runner(args: &[OsString]) { ); drop(local); + let notification_socket_path = args.get(5).unwrap(); + let mut notifications = connect_notification_with_retry(Path::new(notification_socket_path)) + .expect("failed to connect notification channel"); + assert_eq!( + notifications.recv_notification().unwrap(), + Some(expected_notification()) + ); + // SAFETY: `getppid` takes no pointer arguments and has no Rust-side aliasing requirements. let broker_pid = unsafe { libc::getppid() }; // SAFETY: `broker_pid` is the runner's parent process and `SIGTERM` is a valid signal number. @@ -111,6 +139,39 @@ fn run_fake_runner(args: &[OsString]) { ); } +fn spawn_notification_sender(socket_path: PathBuf) -> std::thread::JoinHandle<()> { + let _ = std::fs::remove_file(&socket_path); + let listener = UnixListener::bind(&socket_path).unwrap(); + listener.set_nonblocking(true).unwrap(); + + std::thread::spawn(move || { + let deadline = Instant::now() + Duration::from_secs(5); + let stream = loop { + match listener.accept() { + Ok((stream, _)) => break stream, + Err(error) + if error.kind() == ErrorKind::WouldBlock && Instant::now() < deadline => + { + std::thread::sleep(Duration::from_millis(10)); + } + Err(error) => panic!("failed to accept notification connection: {error}"), + } + }; + let mut channel = UnixStreamHostNotificationChannel::from_accepted(stream); + channel.send_notification(&expected_notification()).unwrap(); + }) +} + +fn expected_notification() -> BrokerNotification { + BrokerNotification::EventReadiness(EventReadinessNotification { + handle: ObjectHandle(7), + readiness: ReadinessState { + read_ready: true, + write_ready: false, + }, + }) +} + struct ChildGuard { child: Child, } @@ -141,3 +202,34 @@ fn connect_with_retry(socket_path: &Path) -> Result Result { + let deadline = Instant::now() + Duration::from_secs(5); + loop { + match UnixStreamLocalNotificationChannel::connect(socket_path) { + Ok(channel) => return Ok(channel), + Err(error) if Instant::now() < deadline => { + if error.kind() != ErrorKind::NotFound + && error.kind() != ErrorKind::ConnectionRefused + { + return Err(error); + } + std::thread::sleep(Duration::from_millis(10)); + } + Err(error) => return Err(error), + } + } +} + +fn unique_socket_path(name: &str) -> PathBuf { + let nonce = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + std::env::temp_dir().join(format!( + "litebox-broker-userland-{name}-{}-{nonce}.sock", + std::process::id() + )) +} From 626aed11778bcf7f0a1b01dabe82799a47ef298f Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Fri, 26 Jun 2026 13:46:04 -0700 Subject: [PATCH 3/6] Test broker-stack notification channel Replace the standalone notification sender with a broker-stack smoke test that sends readiness for the actual event handle observed through serve_connection. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/notification_channel.rs | 231 ++++++++++++++++++ .../tests/userland_broker.rs | 98 +------- 2 files changed, 234 insertions(+), 95 deletions(-) create mode 100644 litebox_broker_userland/tests/notification_channel.rs diff --git a/litebox_broker_userland/tests/notification_channel.rs b/litebox_broker_userland/tests/notification_channel.rs new file mode 100644 index 000000000..9099c02e5 --- /dev/null +++ b/litebox_broker_userland/tests/notification_channel.rs @@ -0,0 +1,231 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +use std::io::{Error, ErrorKind, Result}; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; + +use litebox_broker_core::{BrokerCore, PolicyEngine, PrincipalRights}; +use litebox_broker_host::{ConnectionTermination, serve_connection}; +use litebox_broker_local::BrokerLocal; +use litebox_broker_protocol::ObjectHandle; +use litebox_broker_protocol::channel::{ + HostControlChannel, HostNotificationChannel, HostReceive, LocalNotificationChannel, + PeerCredential, +}; +use litebox_broker_protocol::message::{ + BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerNotification, BrokerRequest, + BrokerResponse, EventReadinessNotification, EventRequest, EventResponse, +}; +use litebox_broker_transport::unix_socket::{ + UnixStreamHostControlChannel, UnixStreamHostNotificationChannel, UnixStreamLocalControlChannel, + UnixStreamLocalNotificationChannel, +}; + +const TEST_TIMEOUT: Duration = Duration::from_secs(5); + +#[test] +fn broker_sends_event_readiness_notification_over_userland_channel() { + let control_socket_path = unique_socket_path("control"); + let notification_socket_path = unique_socket_path("notification"); + let broker = spawn_test_broker( + control_socket_path.clone(), + notification_socket_path.clone(), + ); + + let mut notifications = connect_notification_with_retry(¬ification_socket_path).unwrap(); + let control_channel = connect_control_with_retry(&control_socket_path).unwrap(); + let mut local = BrokerLocal::negotiate(control_channel).unwrap(); + + let handle = local.create_event_with_count(0).unwrap(); + let readiness = local.add_event(handle, 1).unwrap(); + assert_eq!( + notifications.recv_notification().unwrap(), + Some(BrokerNotification::EventReadiness( + EventReadinessNotification { handle, readiness } + )) + ); + + drop(local); + broker.join(); +} + +fn spawn_test_broker( + control_socket_path: PathBuf, + notification_socket_path: PathBuf, +) -> TestBroker { + let _ = std::fs::remove_file(&control_socket_path); + let _ = std::fs::remove_file(¬ification_socket_path); + let (ready_tx, ready_rx) = std::sync::mpsc::channel(); + let thread_control_socket_path = control_socket_path.clone(); + let thread_notification_socket_path = notification_socket_path.clone(); + let thread = std::thread::spawn(move || { + let control_listener = UnixListener::bind(&thread_control_socket_path).unwrap(); + let notification_listener = UnixListener::bind(&thread_notification_socket_path).unwrap(); + control_listener.set_nonblocking(true).unwrap(); + notification_listener.set_nonblocking(true).unwrap(); + ready_tx.send(()).unwrap(); + + let notification_stream = accept_with_retry(¬ification_listener).unwrap(); + let control_stream = accept_with_retry(&control_listener).unwrap(); + let broker = BrokerCore::new(PolicyEngine::with_unauthenticated_rights( + PrincipalRights::all(), + )) + .unwrap(); + let mut channel = NotifyingHostControlChannel { + inner: UnixStreamHostControlChannel::from_accepted(control_stream), + notifications: UnixStreamHostNotificationChannel::from_accepted(notification_stream), + pending_add_handle: None, + }; + + assert_eq!( + serve_connection(&broker, &mut channel).unwrap(), + ConnectionTermination::PeerClosed + ); + }); + + ready_rx + .recv_timeout(TEST_TIMEOUT) + .expect("broker test host did not start"); + TestBroker { + thread: Some(thread), + control_socket_path, + notification_socket_path, + } +} + +struct TestBroker { + thread: Option>, + control_socket_path: PathBuf, + notification_socket_path: PathBuf, +} + +impl TestBroker { + fn join(mut self) { + self.thread + .take() + .expect("broker test host thread missing") + .join() + .expect("broker test host panicked"); + } + + fn cleanup(&self) { + let _ = std::fs::remove_file(&self.control_socket_path); + let _ = std::fs::remove_file(&self.notification_socket_path); + } +} + +impl Drop for TestBroker { + fn drop(&mut self) { + self.cleanup(); + } +} + +struct NotifyingHostControlChannel { + inner: UnixStreamHostControlChannel, + notifications: UnixStreamHostNotificationChannel, + pending_add_handle: Option, +} + +impl HostControlChannel for NotifyingHostControlChannel { + type Error = Error; + + fn peer_credential(&self) -> Result { + self.inner.peer_credential() + } + + fn recv_handshake_request(&mut self) -> Result> { + self.inner.recv_handshake_request() + } + + fn send_handshake_response(&mut self, response: &BrokerHandshakeResponse) -> Result<()> { + self.inner.send_handshake_response(response) + } + + fn recv_request(&mut self) -> Result> { + let request = self.inner.recv_request()?; + self.pending_add_handle = match &request { + HostReceive::Message(BrokerRequest::Event(EventRequest::Add(request))) => { + Some(request.handle) + } + _ => None, + }; + Ok(request) + } + + fn send_response(&mut self, response: &BrokerResponse) -> Result<()> { + if let (Some(handle), BrokerResponse::Event(EventResponse::Add(response))) = + (self.pending_add_handle.take(), response) + { + self.notifications + .send_notification(&BrokerNotification::EventReadiness( + EventReadinessNotification { + handle, + readiness: response.readiness, + }, + ))?; + } + self.inner.send_response(response) + } +} + +fn connect_control_with_retry(socket_path: &Path) -> Result { + let deadline = Instant::now() + TEST_TIMEOUT; + loop { + match UnixStreamLocalControlChannel::connect_with_setup_deadline(socket_path, deadline) { + Ok(channel) => return Ok(channel), + Err(error) if retry_socket_connect(&error, deadline) => { + std::thread::sleep(Duration::from_millis(10)); + } + Err(error) => return Err(error), + } + } +} + +fn connect_notification_with_retry( + socket_path: &Path, +) -> Result { + let deadline = Instant::now() + TEST_TIMEOUT; + loop { + match UnixStreamLocalNotificationChannel::connect(socket_path) { + Ok(channel) => return Ok(channel), + Err(error) if retry_socket_connect(&error, deadline) => { + std::thread::sleep(Duration::from_millis(10)); + } + Err(error) => return Err(error), + } + } +} + +fn retry_socket_connect(error: &Error, deadline: Instant) -> bool { + Instant::now() < deadline + && matches!( + error.kind(), + ErrorKind::NotFound | ErrorKind::ConnectionRefused + ) +} + +fn accept_with_retry(listener: &UnixListener) -> Result { + let deadline = Instant::now() + TEST_TIMEOUT; + loop { + match listener.accept() { + Ok((stream, _)) => return Ok(stream), + Err(error) if error.kind() == ErrorKind::WouldBlock && Instant::now() < deadline => { + std::thread::sleep(Duration::from_millis(10)); + } + Err(error) => return Err(error), + } + } +} + +fn unique_socket_path(name: &str) -> PathBuf { + let nonce = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + std::env::temp_dir().join(format!( + "litebox-broker-userland-notification-channel-{name}-{}-{nonce}.sock", + std::process::id() + )) +} diff --git a/litebox_broker_userland/tests/userland_broker.rs b/litebox_broker_userland/tests/userland_broker.rs index 9207ab498..6bc055aed 100644 --- a/litebox_broker_userland/tests/userland_broker.rs +++ b/litebox_broker_userland/tests/userland_broker.rs @@ -3,24 +3,16 @@ use std::ffi::{OsStr, OsString}; use std::io::{Error, ErrorKind, Result}; -use std::os::unix::net::UnixListener; use std::os::unix::process::ExitStatusExt; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::process::{Child, Command}; use std::time::{Duration, Instant}; use litebox_broker_local::BrokerLocal; -use litebox_broker_protocol::ObjectHandle; -use litebox_broker_protocol::channel::{HostNotificationChannel, LocalNotificationChannel}; use litebox_broker_protocol::event::ReadinessState; -use litebox_broker_protocol::message::{BrokerNotification, EventReadinessNotification}; -use litebox_broker_transport::unix_socket::{ - UnixStreamHostNotificationChannel, UnixStreamLocalControlChannel, - UnixStreamLocalNotificationChannel, -}; +use litebox_broker_transport::unix_socket::UnixStreamLocalControlChannel; const RUNNER_ARGUMENT: &str = "broker-userland-test-runner"; -const NOTIFICATION_SOCKET_ARGUMENT: &str = "--notification-socket"; fn main() { let args = std::env::args_os().skip(1).collect::>(); @@ -42,15 +34,11 @@ fn run_parent_test() { // After the fake runner finishes its broker requests, it terminates the broker // parent process; this lets the test exercise the long-running broker without a // test-only shutdown path. - let notification_socket_path = unique_socket_path("notification"); - let notification_sender = spawn_notification_sender(notification_socket_path.clone()); let mut broker = ChildGuard { child: Command::new(env!("CARGO_BIN_EXE_litebox-broker-userland")) .arg("--runner") .arg(std::env::current_exe().unwrap()) .arg(RUNNER_ARGUMENT) - .arg(NOTIFICATION_SOCKET_ARGUMENT) - .arg(¬ification_socket_path) .spawn() .unwrap(), }; @@ -59,14 +47,10 @@ fn run_parent_test() { while Instant::now() < deadline { if let Some(status) = broker.child.try_wait().unwrap() { assert_eq!(status.signal(), Some(libc::SIGTERM)); - notification_sender.join().unwrap(); - let _ = std::fs::remove_file(¬ification_socket_path); return; } std::thread::sleep(Duration::from_millis(10)); } - notification_sender.join().unwrap(); - let _ = std::fs::remove_file(¬ification_socket_path); panic!("timed out waiting for broker to stop"); } @@ -83,11 +67,7 @@ fn run_fake_runner(args: &[OsString]) { args.get(3).map(OsString::as_os_str), Some(OsStr::new(RUNNER_ARGUMENT)) ); - assert_eq!( - args.get(4).map(OsString::as_os_str), - Some(OsStr::new(NOTIFICATION_SOCKET_ARGUMENT)) - ); - assert_eq!(args.len(), 6, "unexpected runner arguments: {args:?}"); + assert_eq!(args.len(), 4, "unexpected runner arguments: {args:?}"); let socket_path = args.get(2).unwrap(); let channel = connect_with_retry(Path::new(socket_path)).unwrap(); @@ -119,14 +99,6 @@ fn run_fake_runner(args: &[OsString]) { ); drop(local); - let notification_socket_path = args.get(5).unwrap(); - let mut notifications = connect_notification_with_retry(Path::new(notification_socket_path)) - .expect("failed to connect notification channel"); - assert_eq!( - notifications.recv_notification().unwrap(), - Some(expected_notification()) - ); - // SAFETY: `getppid` takes no pointer arguments and has no Rust-side aliasing requirements. let broker_pid = unsafe { libc::getppid() }; // SAFETY: `broker_pid` is the runner's parent process and `SIGTERM` is a valid signal number. @@ -139,39 +111,6 @@ fn run_fake_runner(args: &[OsString]) { ); } -fn spawn_notification_sender(socket_path: PathBuf) -> std::thread::JoinHandle<()> { - let _ = std::fs::remove_file(&socket_path); - let listener = UnixListener::bind(&socket_path).unwrap(); - listener.set_nonblocking(true).unwrap(); - - std::thread::spawn(move || { - let deadline = Instant::now() + Duration::from_secs(5); - let stream = loop { - match listener.accept() { - Ok((stream, _)) => break stream, - Err(error) - if error.kind() == ErrorKind::WouldBlock && Instant::now() < deadline => - { - std::thread::sleep(Duration::from_millis(10)); - } - Err(error) => panic!("failed to accept notification connection: {error}"), - } - }; - let mut channel = UnixStreamHostNotificationChannel::from_accepted(stream); - channel.send_notification(&expected_notification()).unwrap(); - }) -} - -fn expected_notification() -> BrokerNotification { - BrokerNotification::EventReadiness(EventReadinessNotification { - handle: ObjectHandle(7), - readiness: ReadinessState { - read_ready: true, - write_ready: false, - }, - }) -} - struct ChildGuard { child: Child, } @@ -202,34 +141,3 @@ fn connect_with_retry(socket_path: &Path) -> Result Result { - let deadline = Instant::now() + Duration::from_secs(5); - loop { - match UnixStreamLocalNotificationChannel::connect(socket_path) { - Ok(channel) => return Ok(channel), - Err(error) if Instant::now() < deadline => { - if error.kind() != ErrorKind::NotFound - && error.kind() != ErrorKind::ConnectionRefused - { - return Err(error); - } - std::thread::sleep(Duration::from_millis(10)); - } - Err(error) => return Err(error), - } - } -} - -fn unique_socket_path(name: &str) -> PathBuf { - let nonce = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_nanos(); - std::env::temp_dir().join(format!( - "litebox-broker-userland-{name}-{}-{nonce}.sock", - std::process::id() - )) -} From e01620a694c06e549678b15d94006a40b4f9aa94 Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Fri, 26 Jun 2026 14:00:03 -0700 Subject: [PATCH 4/6] Bound notification smoke test receive Set a read timeout on the notification socket so the broker-stack smoke test fails instead of hanging if readiness notification delivery regresses. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- litebox_broker_userland/tests/notification_channel.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/litebox_broker_userland/tests/notification_channel.rs b/litebox_broker_userland/tests/notification_channel.rs index 9099c02e5..51b100707 100644 --- a/litebox_broker_userland/tests/notification_channel.rs +++ b/litebox_broker_userland/tests/notification_channel.rs @@ -188,8 +188,11 @@ fn connect_notification_with_retry( ) -> Result { let deadline = Instant::now() + TEST_TIMEOUT; loop { - match UnixStreamLocalNotificationChannel::connect(socket_path) { - Ok(channel) => return Ok(channel), + match UnixStream::connect(socket_path) { + Ok(stream) => { + stream.set_read_timeout(Some(TEST_TIMEOUT))?; + return Ok(UnixStreamLocalNotificationChannel::from_connected(stream)); + } Err(error) if retry_socket_connect(&error, deadline) => { std::thread::sleep(Duration::from_millis(10)); } From 019097abfa4d404d8c86307dd5c22bf86c13297b Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Fri, 26 Jun 2026 16:01:27 -0700 Subject: [PATCH 5/6] Move readiness notification serving into host Add a production broker-host entry point that serves control requests with a paired notification channel and emits readiness notifications for state-changing event requests. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- litebox_broker_host/src/lib.rs | 135 ++++++++++++++++-- .../tests/notification_channel.rs | 76 ++-------- 2 files changed, 138 insertions(+), 73 deletions(-) diff --git a/litebox_broker_host/src/lib.rs b/litebox_broker_host/src/lib.rs index 20fd3cb3d..4e6d0d24e 100644 --- a/litebox_broker_host/src/lib.rs +++ b/litebox_broker_host/src/lib.rs @@ -3,9 +3,9 @@ //! Channel-neutral broker-side protocol/core adapter. //! -//! This crate wires `litebox_broker_core` to any implementation of the neutral -//! host-side control-channel trait. Concrete channels live in separate crates such as -//! `litebox_broker_transport`. +//! This crate wires `litebox_broker_core` to implementations of the neutral +//! host-side control and notification channel traits. Concrete channels live in +//! separate crates such as `litebox_broker_transport`. #![no_std] @@ -13,13 +13,16 @@ extern crate std; use litebox_broker_core::{BrokerCore, BrokerSession, CallerCredential}; -use litebox_broker_protocol::BROKER_PROTOCOL_VERSION; -use litebox_broker_protocol::channel::{HostControlChannel, HostReceive, PeerCredential}; +use litebox_broker_protocol::channel::{ + HostControlChannel, HostNotificationChannel, HostReceive, PeerCredential, +}; use litebox_broker_protocol::error::ErrorCode; use litebox_broker_protocol::event::{AddEventResponse, CreateEventResponse, WaitEventResponse}; use litebox_broker_protocol::message::{ - BrokerHandshakeResponse, BrokerRequest, BrokerResponse, EventRequest, EventResponse, + BrokerHandshakeResponse, BrokerNotification, BrokerRequest, BrokerResponse, + EventReadinessNotification, EventRequest, EventResponse, }; +use litebox_broker_protocol::{BROKER_PROTOCOL_VERSION, ObjectHandle}; mod error; @@ -30,6 +33,41 @@ pub fn serve_connection( core: &BrokerCore, channel: &mut Channel, ) -> Result +where + Channel: HostControlChannel, +{ + match negotiate_connection(core, channel)? { + NegotiatedConnection::Active(session) => serve_request_loop(channel, &session), + NegotiatedConnection::Terminated(termination) => Ok(termination), + } +} + +/// Authenticates, negotiates, and serves one broker connection with notifications. +/// +/// Control requests and responses remain strictly paired on the control channel. +/// Readiness notifications are sent over the separate notification channel after +/// state-changing event requests complete. +pub fn serve_connection_with_notifications( + core: &BrokerCore, + control_channel: &mut ControlChannel, + notification_channel: &mut NotificationChannel, +) -> Result +where + ControlChannel: HostControlChannel, + NotificationChannel: HostNotificationChannel, +{ + match negotiate_connection(core, control_channel)? { + NegotiatedConnection::Active(session) => { + serve_request_loop_with_notifications(control_channel, &session, notification_channel) + } + NegotiatedConnection::Terminated(termination) => Ok(termination), + } +} + +fn negotiate_connection( + core: &BrokerCore, + channel: &mut Channel, +) -> Result where Channel: HostControlChannel, { @@ -54,9 +92,15 @@ where ErrorCode::ProtocolState, )) .map_err(BrokerHostError::Channel)?; - return Ok(ConnectionTermination::ProtocolViolation); + return Ok(NegotiatedConnection::Terminated( + ConnectionTermination::ProtocolViolation, + )); + } + HostReceive::PeerClosed => { + return Ok(NegotiatedConnection::Terminated( + ConnectionTermination::PeerClosed, + )); } - HostReceive::PeerClosed => return Ok(ConnectionTermination::PeerClosed), }; let negotiated = request.protocol_version == BROKER_PROTOCOL_VERSION; @@ -77,7 +121,7 @@ where } } - serve_request_loop(channel, &session) + Ok(NegotiatedConnection::Active(session)) } fn serve_request_loop( @@ -108,6 +152,45 @@ where Ok(ConnectionTermination::PeerClosed) } +fn serve_request_loop_with_notifications( + control_channel: &mut ControlChannel, + session: &BrokerSession, + notification_channel: &mut NotificationChannel, +) -> Result +where + ControlChannel: HostControlChannel, + NotificationChannel: HostNotificationChannel, +{ + loop { + let request = match control_channel + .recv_request() + .map_err(BrokerHostError::Channel)? + { + HostReceive::Message(request) => request, + HostReceive::ProtocolViolation => { + control_channel + .send_response(&BrokerResponse::Error(ErrorCode::ProtocolState)) + .map_err(BrokerHostError::Channel)?; + return Ok(ConnectionTermination::ProtocolViolation); + } + HostReceive::PeerClosed => break, + }; + + let readiness_handle = event_readiness_notification_handle(&request); + let response = handle_request(session, request); + control_channel + .send_response(&response) + .map_err(BrokerHostError::Channel)?; + if let Some(notification) = event_readiness_notification(readiness_handle, &response) { + notification_channel + .send_notification(¬ification) + .map_err(BrokerHostError::Channel)?; + } + } + + Ok(ConnectionTermination::PeerClosed) +} + fn handle_request(session: &BrokerSession, request: BrokerRequest) -> BrokerResponse { match request { BrokerRequest::CloseObject(handle) => match session.close_object_reference(handle) { @@ -153,6 +236,40 @@ fn handle_event_request(session: &BrokerSession, request: EventRequest) -> Broke } } +fn event_readiness_notification_handle(request: &BrokerRequest) -> Option { + match request { + BrokerRequest::Event(EventRequest::Add(request)) => Some(request.handle), + BrokerRequest::Event(EventRequest::Consume(request)) => Some(request.handle), + _ => None, + } +} + +fn event_readiness_notification( + handle: Option, + response: &BrokerResponse, +) -> Option { + match (handle, response) { + (Some(handle), BrokerResponse::Event(EventResponse::Add(response))) => Some( + BrokerNotification::EventReadiness(EventReadinessNotification { + handle, + readiness: response.readiness, + }), + ), + (Some(handle), BrokerResponse::Event(EventResponse::Consume(response))) => Some( + BrokerNotification::EventReadiness(EventReadinessNotification { + handle, + readiness: response.readiness, + }), + ), + _ => None, + } +} + +enum NegotiatedConnection { + Active(BrokerSession), + Terminated(ConnectionTermination), +} + /// Terminal outcome after processing one broker connection. #[derive(Clone, Copy, Debug, PartialEq, Eq)] #[non_exhaustive] diff --git a/litebox_broker_userland/tests/notification_channel.rs b/litebox_broker_userland/tests/notification_channel.rs index 51b100707..6c4104d68 100644 --- a/litebox_broker_userland/tests/notification_channel.rs +++ b/litebox_broker_userland/tests/notification_channel.rs @@ -7,17 +7,10 @@ use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; use litebox_broker_core::{BrokerCore, PolicyEngine, PrincipalRights}; -use litebox_broker_host::{ConnectionTermination, serve_connection}; +use litebox_broker_host::{ConnectionTermination, serve_connection_with_notifications}; use litebox_broker_local::BrokerLocal; -use litebox_broker_protocol::ObjectHandle; -use litebox_broker_protocol::channel::{ - HostControlChannel, HostNotificationChannel, HostReceive, LocalNotificationChannel, - PeerCredential, -}; -use litebox_broker_protocol::message::{ - BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerNotification, BrokerRequest, - BrokerResponse, EventReadinessNotification, EventRequest, EventResponse, -}; +use litebox_broker_protocol::channel::LocalNotificationChannel; +use litebox_broker_protocol::message::{BrokerNotification, EventReadinessNotification}; use litebox_broker_transport::unix_socket::{ UnixStreamHostControlChannel, UnixStreamHostNotificationChannel, UnixStreamLocalControlChannel, UnixStreamLocalNotificationChannel, @@ -73,14 +66,17 @@ fn spawn_test_broker( PrincipalRights::all(), )) .unwrap(); - let mut channel = NotifyingHostControlChannel { - inner: UnixStreamHostControlChannel::from_accepted(control_stream), - notifications: UnixStreamHostNotificationChannel::from_accepted(notification_stream), - pending_add_handle: None, - }; + let mut control_channel = UnixStreamHostControlChannel::from_accepted(control_stream); + let mut notification_channel = + UnixStreamHostNotificationChannel::from_accepted(notification_stream); assert_eq!( - serve_connection(&broker, &mut channel).unwrap(), + serve_connection_with_notifications( + &broker, + &mut control_channel, + &mut notification_channel, + ) + .unwrap(), ConnectionTermination::PeerClosed ); }); @@ -122,54 +118,6 @@ impl Drop for TestBroker { } } -struct NotifyingHostControlChannel { - inner: UnixStreamHostControlChannel, - notifications: UnixStreamHostNotificationChannel, - pending_add_handle: Option, -} - -impl HostControlChannel for NotifyingHostControlChannel { - type Error = Error; - - fn peer_credential(&self) -> Result { - self.inner.peer_credential() - } - - fn recv_handshake_request(&mut self) -> Result> { - self.inner.recv_handshake_request() - } - - fn send_handshake_response(&mut self, response: &BrokerHandshakeResponse) -> Result<()> { - self.inner.send_handshake_response(response) - } - - fn recv_request(&mut self) -> Result> { - let request = self.inner.recv_request()?; - self.pending_add_handle = match &request { - HostReceive::Message(BrokerRequest::Event(EventRequest::Add(request))) => { - Some(request.handle) - } - _ => None, - }; - Ok(request) - } - - fn send_response(&mut self, response: &BrokerResponse) -> Result<()> { - if let (Some(handle), BrokerResponse::Event(EventResponse::Add(response))) = - (self.pending_add_handle.take(), response) - { - self.notifications - .send_notification(&BrokerNotification::EventReadiness( - EventReadinessNotification { - handle, - readiness: response.readiness, - }, - ))?; - } - self.inner.send_response(response) - } -} - fn connect_control_with_retry(socket_path: &Path) -> Result { let deadline = Instant::now() + TEST_TIMEOUT; loop { From 5c78e6e01731681bd766e1ea3b77fc3363035dd0 Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Fri, 26 Jun 2026 16:18:43 -0700 Subject: [PATCH 6/6] Simplify broker host negotiation flow Remove the private NegotiatedConnection enum and use a small handshake helper that returns an early termination only when negotiation does not reach the active request loop. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- litebox_broker_host/src/lib.rs | 48 ++++++++++++++++------------------ 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/litebox_broker_host/src/lib.rs b/litebox_broker_host/src/lib.rs index 4e6d0d24e..c4f311c2e 100644 --- a/litebox_broker_host/src/lib.rs +++ b/litebox_broker_host/src/lib.rs @@ -36,10 +36,11 @@ pub fn serve_connection( where Channel: HostControlChannel, { - match negotiate_connection(core, channel)? { - NegotiatedConnection::Active(session) => serve_request_loop(channel, &session), - NegotiatedConnection::Terminated(termination) => Ok(termination), + let session = create_session(core, channel)?; + if let Some(termination) = negotiate_protocol(channel)? { + return Ok(termination); } + serve_request_loop(channel, &session) } /// Authenticates, negotiates, and serves one broker connection with notifications. @@ -56,18 +57,17 @@ where ControlChannel: HostControlChannel, NotificationChannel: HostNotificationChannel, { - match negotiate_connection(core, control_channel)? { - NegotiatedConnection::Active(session) => { - serve_request_loop_with_notifications(control_channel, &session, notification_channel) - } - NegotiatedConnection::Terminated(termination) => Ok(termination), + let session = create_session(core, control_channel)?; + if let Some(termination) = negotiate_protocol(control_channel)? { + return Ok(termination); } + serve_request_loop_with_notifications(control_channel, &session, notification_channel) } -fn negotiate_connection( +fn create_session( core: &BrokerCore, - channel: &mut Channel, -) -> Result + channel: &Channel, +) -> Result where Channel: HostControlChannel, { @@ -78,8 +78,15 @@ where PeerCredential::Unauthenticated => CallerCredential::Unauthenticated, _ => return Err(BrokerHostError::Broker(ErrorCode::PolicyDenied)), }; - let session = core.create_session(caller_credential)?; + Ok(core.create_session(caller_credential)?) +} +fn negotiate_protocol( + channel: &mut Channel, +) -> Result, Channel::Error> +where + Channel: HostControlChannel, +{ loop { let request = match channel .recv_handshake_request() @@ -92,15 +99,9 @@ where ErrorCode::ProtocolState, )) .map_err(BrokerHostError::Channel)?; - return Ok(NegotiatedConnection::Terminated( - ConnectionTermination::ProtocolViolation, - )); - } - HostReceive::PeerClosed => { - return Ok(NegotiatedConnection::Terminated( - ConnectionTermination::PeerClosed, - )); + return Ok(Some(ConnectionTermination::ProtocolViolation)); } + HostReceive::PeerClosed => return Ok(Some(ConnectionTermination::PeerClosed)), }; let negotiated = request.protocol_version == BROKER_PROTOCOL_VERSION; @@ -121,7 +122,7 @@ where } } - Ok(NegotiatedConnection::Active(session)) + Ok(None) } fn serve_request_loop( @@ -265,11 +266,6 @@ fn event_readiness_notification( } } -enum NegotiatedConnection { - Active(BrokerSession), - Terminated(ConnectionTermination), -} - /// Terminal outcome after processing one broker connection. #[derive(Clone, Copy, Debug, PartialEq, Eq)] #[non_exhaustive]