Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 123 additions & 10 deletions litebox_broker_host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,26 @@

//! Channel-neutral broker-side protocol/core adapter.
//!
//! This crate wires `litebox_broker_core` to any implementation of the neutral
//! host-side control-channel trait. Concrete channels live in separate crates such as
//! `litebox_broker_transport`.
//! This crate wires `litebox_broker_core` to implementations of the neutral
//! host-side control and notification channel traits. Concrete channels live in
//! separate crates such as `litebox_broker_transport`.

#![no_std]

#[cfg(test)]
extern crate std;

use litebox_broker_core::{BrokerCore, BrokerSession, CallerCredential};
use litebox_broker_protocol::BROKER_PROTOCOL_VERSION;
use litebox_broker_protocol::channel::{HostControlChannel, HostReceive, PeerCredential};
use litebox_broker_protocol::channel::{
HostControlChannel, HostNotificationChannel, HostReceive, PeerCredential,
};
use litebox_broker_protocol::error::ErrorCode;
use litebox_broker_protocol::event::{AddEventResponse, CreateEventResponse, WaitEventResponse};
use litebox_broker_protocol::message::{
BrokerHandshakeResponse, BrokerRequest, BrokerResponse, EventRequest, EventResponse,
BrokerHandshakeResponse, BrokerNotification, BrokerRequest, BrokerResponse,
EventReadinessNotification, EventRequest, EventResponse,
};
use litebox_broker_protocol::{BROKER_PROTOCOL_VERSION, ObjectHandle};

mod error;

Expand All @@ -30,6 +33,41 @@ pub fn serve_connection<Channel>(
core: &BrokerCore,
channel: &mut Channel,
) -> Result<ConnectionTermination, Channel::Error>
where
Channel: HostControlChannel,
{
let session = create_session(core, channel)?;
if let Some(termination) = negotiate_protocol(channel)? {
return Ok(termination);
}
serve_request_loop(channel, &session)
}

/// Authenticates, negotiates, and serves one broker connection with notifications.
///
/// Control requests and responses remain strictly paired on the control channel.
/// Readiness notifications are sent over the separate notification channel after
/// state-changing event requests complete.
pub fn serve_connection_with_notifications<ControlChannel, NotificationChannel>(
core: &BrokerCore,
control_channel: &mut ControlChannel,
notification_channel: &mut NotificationChannel,
) -> Result<ConnectionTermination, ControlChannel::Error>
where
ControlChannel: HostControlChannel,
NotificationChannel: HostNotificationChannel<Error = ControlChannel::Error>,
{
let session = create_session(core, control_channel)?;
if let Some(termination) = negotiate_protocol(control_channel)? {
return Ok(termination);
}
serve_request_loop_with_notifications(control_channel, &session, notification_channel)
}

fn create_session<Channel>(
core: &BrokerCore,
channel: &Channel,
) -> Result<BrokerSession, Channel::Error>
where
Channel: HostControlChannel,
{
Expand All @@ -40,8 +78,15 @@ where
PeerCredential::Unauthenticated => CallerCredential::Unauthenticated,
_ => return Err(BrokerHostError::Broker(ErrorCode::PolicyDenied)),
};
let session = core.create_session(caller_credential)?;
Ok(core.create_session(caller_credential)?)
}

fn negotiate_protocol<Channel>(
channel: &mut Channel,
) -> Result<Option<ConnectionTermination>, Channel::Error>
where
Channel: HostControlChannel,
{
loop {
let request = match channel
.recv_handshake_request()
Expand All @@ -54,9 +99,9 @@ where
ErrorCode::ProtocolState,
))
.map_err(BrokerHostError::Channel)?;
return Ok(ConnectionTermination::ProtocolViolation);
return Ok(Some(ConnectionTermination::ProtocolViolation));
}
HostReceive::PeerClosed => return Ok(ConnectionTermination::PeerClosed),
HostReceive::PeerClosed => return Ok(Some(ConnectionTermination::PeerClosed)),
};

let negotiated = request.protocol_version == BROKER_PROTOCOL_VERSION;
Expand All @@ -77,7 +122,7 @@ where
}
}

serve_request_loop(channel, &session)
Ok(None)
}

fn serve_request_loop<Channel>(
Expand Down Expand Up @@ -108,6 +153,45 @@ where
Ok(ConnectionTermination::PeerClosed)
}

fn serve_request_loop_with_notifications<ControlChannel, NotificationChannel>(
control_channel: &mut ControlChannel,
session: &BrokerSession,
notification_channel: &mut NotificationChannel,
) -> Result<ConnectionTermination, ControlChannel::Error>
where
ControlChannel: HostControlChannel,
NotificationChannel: HostNotificationChannel<Error = ControlChannel::Error>,
{
loop {
let request = match control_channel
.recv_request()
.map_err(BrokerHostError::Channel)?
{
HostReceive::Message(request) => request,
HostReceive::ProtocolViolation => {
control_channel
.send_response(&BrokerResponse::Error(ErrorCode::ProtocolState))
.map_err(BrokerHostError::Channel)?;
return Ok(ConnectionTermination::ProtocolViolation);
}
HostReceive::PeerClosed => break,
};

let readiness_handle = event_readiness_notification_handle(&request);
let response = handle_request(session, request);
control_channel
.send_response(&response)
.map_err(BrokerHostError::Channel)?;
if let Some(notification) = event_readiness_notification(readiness_handle, &response) {
notification_channel
.send_notification(&notification)
.map_err(BrokerHostError::Channel)?;
}
}

Ok(ConnectionTermination::PeerClosed)
}

fn handle_request(session: &BrokerSession, request: BrokerRequest) -> BrokerResponse {
match request {
BrokerRequest::CloseObject(handle) => match session.close_object_reference(handle) {
Expand Down Expand Up @@ -153,6 +237,35 @@ fn handle_event_request(session: &BrokerSession, request: EventRequest) -> Broke
}
}

fn event_readiness_notification_handle(request: &BrokerRequest) -> Option<ObjectHandle> {
match request {
BrokerRequest::Event(EventRequest::Add(request)) => Some(request.handle),
BrokerRequest::Event(EventRequest::Consume(request)) => Some(request.handle),
_ => None,
}
}

fn event_readiness_notification(
handle: Option<ObjectHandle>,
response: &BrokerResponse,
) -> Option<BrokerNotification> {
match (handle, response) {
(Some(handle), BrokerResponse::Event(EventResponse::Add(response))) => Some(
BrokerNotification::EventReadiness(EventReadinessNotification {
handle,
readiness: response.readiness,
}),
),
(Some(handle), BrokerResponse::Event(EventResponse::Consume(response))) => Some(
BrokerNotification::EventReadiness(EventReadinessNotification {
handle,
readiness: response.readiness,
}),
),
_ => None,
}
}

/// Terminal outcome after processing one broker connection.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
Expand Down
32 changes: 31 additions & 1 deletion litebox_broker_protocol/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
// Licensed under the MIT license.

use crate::message::{
BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerRequest, BrokerResponse,
BrokerHandshakeRequest, BrokerHandshakeResponse, BrokerNotification, BrokerRequest,
BrokerResponse,
};

/// Peer identity information supplied by the channel or host layer.
Expand Down Expand Up @@ -85,3 +86,32 @@ pub trait HostControlChannel {
/// Sends one active broker response.
fn send_response(&mut self, response: &BrokerResponse) -> Result<(), Self::Error>;
}

/// Local-side receive channel for broker-initiated asynchronous notifications.
///
/// A notification channel is separate from the control channel so active broker
/// requests remain strictly paired with their responses. The deployment is
/// responsible for binding this channel to the same authenticated broker
/// association as the matching control channel.
pub trait LocalNotificationChannel {
/// Channel-specific error type.
type Error;

/// Receives one broker notification.
///
/// Returns `Ok(None)` when the broker closed the channel cleanly before
/// starting another notification frame.
fn recv_notification(&mut self) -> Result<Option<BrokerNotification>, Self::Error>;
}

/// Host-side send channel for broker-initiated asynchronous notifications.
///
/// Implementations carry notification frames only; object operation responses
/// continue to use [`HostControlChannel::send_response`].
pub trait HostNotificationChannel {
/// Channel-specific error type.
type Error;

/// Sends one broker notification.
fn send_notification(&mut self, notification: &BrokerNotification) -> Result<(), Self::Error>;
}
22 changes: 21 additions & 1 deletion litebox_broker_protocol/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::error::ErrorCode;
use crate::event::{
AddEventRequest, AddEventResponse, ConsumeEventRequest, ConsumeEventResponse,
CreateEventRequest, CreateEventResponse, WaitEventRequest, WaitEventResponse,
CreateEventRequest, CreateEventResponse, ReadinessState, WaitEventRequest, WaitEventResponse,
};
use crate::{ObjectHandle, ProtocolVersion};

Expand Down Expand Up @@ -84,3 +84,23 @@ pub enum EventResponse {
/// Consume operation response.
Consume(ConsumeEventResponse),
}

/// Broker-initiated asynchronous notification.
///
/// Notifications are level-triggered snapshots and may be coalesced or
/// duplicated by a transport. Local waiters must treat them as wakeups to
/// re-check authoritative state, not as ordered state transitions.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum BrokerNotification {
/// Readiness changed or should be re-checked for a broker-owned event object.
EventReadiness(EventReadinessNotification),
}

/// Readiness notification for a broker-owned event object.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct EventReadinessNotification {
/// Event object handle.
pub handle: ObjectHandle,
/// Current broker-authoritative readiness snapshot.
pub readiness: ReadinessState,
}
Loading