Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ fn map_procedure_error(e: ProcedureCallError, procedure: &str) -> (StatusCode, S
StatusCode::NOT_FOUND
}
ProcedureCallError::OutOfEnergy => StatusCode::PAYMENT_REQUIRED,
ProcedureCallError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
ProcedureCallError::GuestPanic(_) | ProcedureCallError::InvalidReturnValue(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
};
log::error!("Error while invoking procedure {e:#}");
(status_code, format!("{:#}", anyhow::anyhow!(e)))
Expand Down
228 changes: 132 additions & 96 deletions crates/client-api/src/routes/subscribe.rs

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions crates/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ mod message_handlers_v3;
pub mod messages;

pub use client_connection::{
ClientConfig, ClientConnection, ClientConnectionReceiver, ClientConnectionSender, ClientSendError, DataMessage,
MeteredDeque, MeteredReceiver, MeteredSender, MeteredUnboundedReceiver, MeteredUnboundedSender, Protocol,
WsVersion,
ClientConfig, ClientConnection, ClientConnectionReceiver, ClientConnectionSender, ClientDisconnectError,
ClientDisconnectSender, ClientSendError, DataMessage, MeteredDeque, MeteredReceiver, MeteredSender,
MeteredUnboundedReceiver, MeteredUnboundedSender, Protocol, WsVersion,
};
pub use client_connection_index::ClientActorIndex;
pub use message_handlers::MessageHandleError;
Expand Down
156 changes: 125 additions & 31 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use spacetimedb_lib::identity::{AuthCtx, RequestId};
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::Identity;
use tokio::sync::mpsc::error::{SendError, TrySendError};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task::AbortHandle;
use tokio::sync::{mpsc, watch};
use tracing::{trace, warn};

#[derive(PartialEq, Eq, Clone, Copy, Hash, Debug)]
Expand Down Expand Up @@ -262,7 +261,15 @@ pub struct ClientConnectionSender {
pub auth: ConnectionAuthCtx,
pub config: ClientConfig,
sendtx: mpsc::Sender<ClientUpdate>,
abort_handle: AbortHandle,
/// Optional because dummy/test senders are not necessarily backed by a
/// live websocket actor control queue.
///
/// Production websocket-backed senders receive this at spawn-time so they
/// can request a transport-level close through the websocket control path.
/// Test constructors such as [`ClientConnectionSender::dummy_with_channel`]
/// and [`ClientConnectionSender::dummy`] intentionally leave this as
/// `None` unless a test explicitly wires a control queue.
disconnect_tx: Option<ClientDisconnectSender>,
cancelled: AtomicBool,

/// Handles on Prometheus metrics related to connections to this database.
Expand Down Expand Up @@ -309,6 +316,21 @@ impl ClientConnectionMetrics {
}
}

#[derive(Clone, Debug)]
pub struct ClientDisconnectSender(mpsc::UnboundedSender<crate::client::messages::CloseFrame>);

impl ClientDisconnectSender {
pub fn new(inner: mpsc::UnboundedSender<crate::client::messages::CloseFrame>) -> Self {
Self(inner)
}

pub fn send(&self, close_frame: crate::client::messages::CloseFrame) -> Result<(), ClientDisconnectError> {
self.0
.send(close_frame)
.map_err(|_| ClientDisconnectError::Disconnected)
}
}

#[derive(Debug, thiserror::Error)]
pub enum ClientSendError {
#[error("client disconnected")]
Expand All @@ -317,18 +339,23 @@ pub enum ClientSendError {
Cancelled,
}

#[derive(Debug, thiserror::Error)]
pub enum ClientDisconnectError {
#[error("client was already cancelled")]
Cancelled,
#[error("disconnect channel disconnected")]
Disconnected,
#[error("disconnect handle is not configured")]
NoDisconnectHandle,
}

impl ClientConnectionSender {
pub fn dummy_with_channel(
id: ClientActorId,
config: ClientConfig,
offset_supply: impl DurableOffsetSupply + 'static,
) -> (Self, ClientConnectionReceiver) {
let (sendtx, rx) = mpsc::channel(CLIENT_CHANNEL_CAPACITY_TEST);
// just make something up, it doesn't need to be attached to a real task
let abort_handle = match tokio::runtime::Handle::try_current() {
Ok(h) => h.spawn(async {}).abort_handle(),
Err(_) => tokio::runtime::Runtime::new().unwrap().spawn(async {}).abort_handle(),
};

let receiver = ClientConnectionReceiver::new(config.confirmed_reads, MeteredReceiver::new(rx), offset_supply);
let cancelled = AtomicBool::new(false);
Expand All @@ -346,7 +373,7 @@ impl ClientConnectionSender {
auth: ConnectionAuthCtx::try_from(dummy_claims).expect("dummy claims should always be valid"),
config,
sendtx,
abort_handle,
disconnect_tx: None,
cancelled,
metrics: None,
};
Expand All @@ -357,10 +384,36 @@ impl ClientConnectionSender {
Self::dummy_with_channel(id, config, offset_supply).0
}

#[cfg(test)]
pub(crate) fn dummy_with_disconnect_channel(
id: ClientActorId,
config: ClientConfig,
offset_supply: impl DurableOffsetSupply + 'static,
) -> (
Self,
ClientConnectionReceiver,
mpsc::UnboundedReceiver<crate::client::messages::CloseFrame>,
) {
let (mut sender, receiver) = Self::dummy_with_channel(id, config, offset_supply);
let (disconnect_tx, disconnect_rx) = mpsc::unbounded_channel();
sender.disconnect_tx = Some(ClientDisconnectSender::new(disconnect_tx));
(sender, receiver, disconnect_rx)
}

pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}

pub fn disconnect(&self, close_frame: crate::client::messages::CloseFrame) -> Result<(), ClientDisconnectError> {
if self.cancelled.load(Relaxed) {
return Err(ClientDisconnectError::Cancelled);
}
self.disconnect_tx
.as_ref()
.ok_or(ClientDisconnectError::NoDisconnectHandle)?
.send(close_frame)
}

/// Send a message to the client. For data-related messages, you should probably use
/// `BroadcastQueue::send` to ensure that the client sees data messages in a consistent order.
///
Expand Down Expand Up @@ -393,8 +446,13 @@ impl ClientConnectionSender {

match self.sendtx.try_send(message) {
Err(mpsc::error::TrySendError::Full(_)) => {
// we've hit CLIENT_CHANNEL_CAPACITY messages backed up in
// the channel, so forcibly kick the client
// We've hit `CLIENT_CHANNEL_CAPACITY` messages backed up in the channel,
// so forcibly kick the client.
//
// Mark the sender cancelled first so subsequent ordered sends
// fail fast immediately, then request websocket close using the
// same control-plane close path as any other server-initiated
// disconnect.
tracing::warn!(
identity = %self.id.identity,
connection_id = %self.id.connection_id,
Expand All @@ -406,8 +464,13 @@ impl ClientConnectionSender {
self.id,
self.sendtx.capacity(),
);
self.abort_handle.abort();
self.cancelled.store(true, Ordering::Relaxed);
if let Some(disconnect_tx) = &self.disconnect_tx {
let _ = disconnect_tx.send(crate::client::messages::CloseFrame {
code: crate::client::messages::CloseCode::Again,
reason: "client channel capacity exceeded".into(),
});
}
return Err(ClientSendError::Cancelled);
}
Err(mpsc::error::TrySendError::Closed(_)) => return Err(ClientSendError::Disconnected),
Expand Down Expand Up @@ -803,6 +866,7 @@ impl ClientConnection {
config: ClientConfig,
replica_id: u64,
mut module_rx: watch::Receiver<ModuleHost>,
disconnect_tx: Option<ClientDisconnectSender>,
actor: impl FnOnce(ClientConnection, ClientConnectionReceiver) -> Fut,
_proof_of_client_connected_call: Connected,
) -> ClientConnection
Expand All @@ -817,25 +881,9 @@ impl ClientConnection {

let (sendtx, sendrx) = mpsc::channel::<ClientUpdate>(CLIENT_CHANNEL_CAPACITY);

let (fut_tx, fut_rx) = oneshot::channel::<Fut>();
// weird dance so that we can get an abort_handle into ClientConnection
let module_info = module.info.clone();
let database_identity = module_info.database_identity;
let client_identity = id.identity;
let abort_handle = tokio::spawn(async move {
let Ok(fut) = fut_rx.await else { return };

let _gauge_guard = module_info.metrics.connected_clients.inc_scope();
module_info.metrics.ws_clients_spawned.inc();
scopeguard::defer! {
let database_identity = module_info.database_identity;
log::warn!("websocket connection aborted for client identity `{client_identity}` and database identity `{database_identity}`");
module_info.metrics.ws_clients_aborted.inc();
};

fut.await
})
.abort_handle();

let metrics = ClientConnectionMetrics::new(database_identity, config.protocol);
let receiver = ClientConnectionReceiver::new(
Expand All @@ -849,7 +897,7 @@ impl ClientConnection {
auth,
config,
sendtx,
abort_handle,
disconnect_tx,
cancelled: AtomicBool::new(false),
metrics: Some(metrics),
});
Expand All @@ -861,8 +909,17 @@ impl ClientConnection {
};

let actor_fut = actor(this.clone(), receiver);
// if this fails, the actor() function called .abort(), which like... okay, I guess?
let _ = fut_tx.send(actor_fut);
tokio::spawn(async move {
let _gauge_guard = module_info.metrics.connected_clients.inc_scope();
module_info.metrics.ws_clients_spawned.inc();
scopeguard::defer! {
let database_identity = module_info.database_identity;
log::warn!("websocket connection aborted for client identity `{client_identity}` and database identity `{database_identity}`");
module_info.metrics.ws_clients_aborted.inc();
};

actor_fut.await
});

this
}
Expand Down Expand Up @@ -1433,4 +1490,41 @@ mod tests {
offset.mark_durable_at(3);
assert_received_update(receiver.recv()).await;
}

#[test]
fn disconnect_without_handle_returns_no_disconnect_handle() {
let sender = ClientConnectionSender::dummy(
ClientActorId::for_test(Identity::ZERO),
ClientConfig::for_test(),
NoneDurableOffset,
);
let res = sender.disconnect(crate::client::messages::CloseFrame {
code: crate::client::messages::CloseCode::Away,
reason: "disconnect".into(),
});
assert_matches!(res, Err(ClientDisconnectError::NoDisconnectHandle));
}

#[test]
fn send_overflow_marks_cancelled_and_emits_disconnect() {
let (sender, _receiver, mut disconnect_rx) = ClientConnectionSender::dummy_with_disconnect_channel(
ClientActorId::for_test(Identity::ZERO),
ClientConfig::for_test(),
NoneDurableOffset,
);

for _ in 0..CLIENT_CHANNEL_CAPACITY_TEST {
sender.send_message(None, empty_tx_update()).unwrap();
}

let res = sender.send_message(None, empty_tx_update());
assert_matches!(res, Err(ClientSendError::Cancelled));
assert!(sender.is_cancelled());

let close_frame = disconnect_rx.try_recv().expect("expected disconnect request");
assert_eq!(close_frame.code, crate::client::messages::CloseCode::Again);

let res = sender.send_message(None, empty_tx_update());
assert_matches!(res, Err(ClientSendError::Cancelled));
}
}
5 changes: 4 additions & 1 deletion crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{ClientConnection, DataMessage, WsVersion};
use super::{messages::CloseFrame, ClientConnection, DataMessage, WsVersion};
use crate::client::message_handlers_v1::MessageExecutionError;
use spacetimedb_lib::bsatn;
use std::time::Instant;
Expand All @@ -15,6 +15,9 @@ pub enum MessageHandleError {
#[error(transparent)]
Execution(#[from] MessageExecutionError),

#[error("Client should be disconnected with close frame {0:?}")]
DisconnectClient(CloseFrame),

#[error("unsupported websocket version: {0}")]
UnsupportedVersion(&'static str),
}
Expand Down
39 changes: 27 additions & 12 deletions crates/core/src/client/message_handlers_v2.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::client::MessageExecutionError;
use crate::client::{messages::CloseFrame, MessageExecutionError};

use super::{ClientConnection, DataMessage, MessageHandleError};
use serde::de::Error as _;
Expand Down Expand Up @@ -52,22 +52,37 @@ pub(super) async fn handle_decoded_message(
let res = client.enqueue_reducer_v2(reducer, args, request_id, timer, flags).await;
match res {
Ok(_) => {
// If this was not a success, we would have already sent an error message.
// If this was not a success, i.e. the reducer returned an error and rolled back,
// we have already sent an error message.
Ok(())
}
Err(e) => {
let err_msg = format!("{e:#}");
let server_message = ws_v2::ServerMessage::ReducerResult(ws_v2::ReducerResult {
request_id,
// Maybe we should use the same timestamp that was used for the reducer context, but this is probably fine for now.
timestamp: Timestamp::now(),
result: ws_v2::ReducerOutcome::InternalError(err_msg.into()),
});
// TODO: Should we kill the client here, or does it mean the client is already dead.
if let Err(send_err) = client.send_message(None, server_message) {
log::warn!("Failed to send reducer error to client: {send_err}");

if let Some(code) = e.close_code() {
// pgoldman 2026-05-14: I've sorta bolted on this error path,
// which attempts to instruct clients on how to handle disconnects,
// as a way to bypass the existing error-handling code which goes through `MessageExecutionError`.
// Prior to my changes, an error in `CallReducer` never resulted in a `MessageExecutionError`,
// as all errors were treated like `ErrorClientConnectionBehavior::RespondError`.
return Err(MessageHandleError::DisconnectClient(CloseFrame {
code,
reason: err_msg.into(),
}));
} else {
let server_message = ws_v2::ServerMessage::ReducerResult(ws_v2::ReducerResult {
request_id,
// Maybe we should use the same timestamp that was used for the reducer context, but this is probably fine for now.
timestamp: Timestamp::now(),
result: ws_v2::ReducerOutcome::InternalError(err_msg.into()),
});

// TODO: Should we kill the client here, or does it mean the client is already dead.
if let Err(send_err) = client.send_message(None, server_message) {
log::warn!("Failed to send reducer error to client: {send_err}");
}
Ok(())
}
Ok(())
}
}
}
Expand Down
Loading
Loading