From c7ac283d6b091b7a54fab9b584e4f29fc2b773dc Mon Sep 17 00:00:00 2001 From: carter Date: Fri, 15 May 2026 12:48:14 -0600 Subject: [PATCH 1/5] Mostly working version of refactor to remove message passing indirection --- CHANGELOG.md | 2 + Cargo.lock | 32 +- .../tests/ros1_native_integration_tests.rs | 8 +- roslibrust_ros1/Cargo.toml | 1 + roslibrust_ros1/src/master_client.rs | 146 +++- roslibrust_ros1/src/node/actor.rs | 726 +++++------------- roslibrust_ros1/src/node/handle.rs | 25 +- roslibrust_ros1/src/node/xmlrpc.rs | 219 ++++-- roslibrust_ros1/src/publisher.rs | 13 +- roslibrust_ros1/src/service_server.rs | 31 +- 10 files changed, 553 insertions(+), 650 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4baa227d..d94d59f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- @DomWilliamsEE fix ros1 subscribers not automatically unsubscribing from the master when dropped. + ### Changed ## 0.20.0 - March 2nd, 2026 diff --git a/Cargo.lock b/Cargo.lock index c9cbaecb..9cb67585 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3547,6 +3547,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-util", + "ureq", ] [[package]] @@ -5098,6 +5099,22 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "ureq" +version = "2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d" +dependencies = [ + "base64 0.22.1", + "flate2", + "log", + "once_cell", + "rustls", + "rustls-pki-types", + "url", + "webpki-roots 0.26.11", +] + [[package]] name = "url" version = "2.5.8" @@ -5309,6 +5326,15 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.5", +] + [[package]] name = "webpki-roots" version = "1.0.5" @@ -6057,7 +6083,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", - "webpki-roots", + "webpki-roots 1.0.5", "x509-parser", "zenoh-buffers", "zenoh-codec", @@ -6086,7 +6112,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", - "webpki-roots", + "webpki-roots 1.0.5", "zenoh-config", "zenoh-core", "zenoh-link-commons", @@ -6173,7 +6199,7 @@ dependencies = [ "tokio-rustls", "tokio-util", "tracing", - "webpki-roots", + "webpki-roots 1.0.5", "x509-parser", "zenoh-config", "zenoh-core", diff --git a/roslibrust/tests/ros1_native_integration_tests.rs b/roslibrust/tests/ros1_native_integration_tests.rs index 202af596..748a5206 100644 --- a/roslibrust/tests/ros1_native_integration_tests.rs +++ b/roslibrust/tests/ros1_native_integration_tests.rs @@ -577,14 +577,14 @@ mod tests { assert!(data.is_subscribed("/test_cleanup_sub", "/test_node_cleanup")); assert!(data.is_service_provider("/test_cleanup_srv", "/test_node_cleanup")); + // Check the Arc strong count before drop + debug!("Arc strong_count before drop: {}", nh.arc_strong_count()); + // Drop our node handle std::mem::drop(nh); + debug!("Drop has happened"); // Confirm here that Node actually got shut down - debug!("Drop has happened"); - // Delay to allow destructor to complete - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - debug!("sleep is over"); let data = master_client.get_system_state().await.unwrap(); info!("Got data after drop: {data:?}"); diff --git a/roslibrust_ros1/Cargo.toml b/roslibrust_ros1/Cargo.toml index 5780646a..7c3bacef 100644 --- a/roslibrust_ros1/Cargo.toml +++ b/roslibrust_ros1/Cargo.toml @@ -24,6 +24,7 @@ test-log = { workspace = true } # These are definitely needed by this crate: reqwest = { version = "0.11" } +ureq = "2.10" serde_xmlrpc = { version = "0.2" } roslibrust_serde_rosmsg = { workspace = true } hyper = { version = "0.14", features = ["server"] } diff --git a/roslibrust_ros1/src/master_client.rs b/roslibrust_ros1/src/master_client.rs index 139f8140..503f5336 100644 --- a/roslibrust_ros1/src/master_client.rs +++ b/roslibrust_ros1/src/master_client.rs @@ -32,6 +32,18 @@ pub struct MasterClient { id: String, } +/// A synchronous version of MasterClient for use in Drop implementations +/// This client uses blocking HTTP calls (via ureq) and can be safely called from synchronous contexts +#[derive(Clone)] +pub struct SyncMasterClient { + // Address at which the rosmaster should be found + master_uri: String, + // Address at which this node should be reached + client_uri: String, + // An id for this node + id: String, +} + /// Format of data returned by rosmaster's getSystemState #[derive(Debug)] struct StateEntry { @@ -347,10 +359,110 @@ impl MasterClient { } } +impl SyncMasterClient { + /// Constructs a new synchronous client for communicating with a ros master + /// - master_uri: Expects a fully resolved uri for the master e.g. "http://localhost:11311" + /// - client_uri: The URI that should be told to other Nodes / Master to reach this nodes xmlrpc server + /// - id: A client_id to use when communicating with the master, expected to be a valid ros name e.g. "/my_node" + pub fn new( + master_uri: impl Into, + client_uri: impl Into, + id: impl Into, + ) -> Self { + SyncMasterClient { + master_uri: master_uri.into(), + client_uri: client_uri.into(), + id: id.into(), + } + } + + fn post( + &self, + request: String, + ) -> Result { + trace!("Sending master (sync): {request}"); + + // Create a client with reasonable timeouts for cleanup operations + let agent = ureq::builder() + .timeout(std::time::Duration::from_secs(2)) + .build(); + + let response = agent + .post(&self.master_uri) + .send_string(&request) + .map_err(|e| { + // Convert ureq error to our error type + RosMasterError::MasterError(format!("HTTP request failed: {}", e)) + })? + .into_string() + .map_err(|e| RosMasterError::MasterError(format!("Failed to read response: {}", e)))?; + trace!("Got response (sync): {response}"); + let (status_code, msg, data) = + serde_xmlrpc::response_from_str::<(i8, String, T)>(&response)?; + match status_code { + 1 => { + trace!("Parsed from rosmaster (sync): {msg:?} {data:?}"); + } + _ => { + return Err(RosMasterError::MasterError(msg)); + } + }; + Ok(data) + } + + /// Synchronously unregister a subscriber from the master + pub fn unregister_subscriber(&self, topic: impl Into) -> Result { + let body = serde_xmlrpc::request_to_string( + "unregisterSubscriber", + vec![ + self.id.clone().into(), + topic.into().into(), + self.client_uri.clone().into(), + ], + )?; + let x: u8 = self.post(body)?; + Ok(x.eq(&1)) + } + + /// Synchronously unregister a publisher from the master + pub fn unregister_publisher(&self, topic: impl Into) -> Result { + let body = serde_xmlrpc::request_to_string( + "unregisterPublisher", + vec![ + self.id.clone().into(), + topic.into().into(), + self.client_uri.clone().into(), + ], + )?; + let x: u8 = self.post(body)?; + Ok(x.eq(&1)) + } + + /// Synchronously unregister a service from the master + pub fn unregister_service( + &self, + service: impl Into, + service_uri: impl Into, + ) -> Result { + let body = serde_xmlrpc::request_to_string( + "unregisterService", + vec![ + self.id.clone().into(), + service.into().into(), + service_uri.into().into(), + ], + )?; + let x: u8 = self.post(body)?; + Ok(x.eq(&1)) + } +} + #[cfg(feature = "ros1_test")] #[cfg(test)] mod test { + use crate::SyncMasterClient; + use super::{MasterClient, RosMasterError}; const TEST_NODE_ID: &str = "/native_ros1_test"; @@ -391,14 +503,14 @@ mod test { #[test_log::test(tokio::test)] async fn test_register_and_unregister_service() { let client = test_client().await.unwrap(); - let service = "/my_service"; + let service = "/my_service_for_testing_registration_async"; let service_uri = "http://localhost:11312"; // Register client.register_service(service, service_uri).await.unwrap(); // Confirm it exists assert_eq!( - client.lookup_service("/my_service").await.unwrap(), + client.lookup_service(service).await.unwrap(), service_uri ); @@ -407,6 +519,36 @@ mod test { .unregister_service(service, service_uri) .await .unwrap()); + + // Confirm it is gone + assert!(client.lookup_service(service).await.is_err()) + } + + #[test_log::test(tokio::test)] + async fn test_register_and_unregister_service_sync() { + let client = test_client().await.unwrap(); + let sync_client = test_sync_client(); + let service = "/my_service_for_testing_registration_sync"; + let service_uri = "http://localhost:11312"; + // Register + client + .register_service(service, service_uri) + .await + .unwrap(); + + // Confirm it exists + assert_eq!( + client.lookup_service(service).await.unwrap(), + service_uri + ); + + // Unregister service + assert!(sync_client + .unregister_service(service, service_uri) + .unwrap()); + + // Confirm it's gone + assert!(client.lookup_service(service).await.is_err()); } #[test_log::test(tokio::test)] diff --git a/roslibrust_ros1/src/node/actor.rs b/roslibrust_ros1/src/node/actor.rs index 0c10b2b2..eaf56e58 100644 --- a/roslibrust_ros1/src/node/actor.rs +++ b/roslibrust_ros1/src/node/actor.rs @@ -1,163 +1,35 @@ use crate::{ - names::Name, - node::{XmlRpcServer, XmlRpcServerHandle}, - publisher::Publication, - service_client::ServiceClientLink, - service_server::ServiceServerLink, - subscriber::Subscription, - MasterClient, NodeError, ProtocolParams, ServiceClient, TypeErasedCallback, + names::Name, node::XmlRpcServer, publisher::Publication, service_client::ServiceClientLink, + service_server::ServiceServerLink, subscriber::Subscription, MasterClient, NodeError, + ServiceClient, TypeErasedCallback, }; -use abort_on_drop::ChildTask; use bytes::Bytes; use log::*; use roslibrust_common::{Error, RosMessageType, RosServiceType, ServiceFn}; -use std::{collections::HashMap, io, net::Ipv4Addr, sync::Arc}; -use tokio::sync::{broadcast, mpsc, oneshot}; - -// Carter TODO: -// I kinda hate this entire Msg based abstraction internal to the server -// Why isn't this just a regular async function call? -// I feel like someone was afraid of deadlocks or didn't know how to mutex safely? -// We should be able to just call the function and get a result back instead of doing -// this odd message passing indirection? -#[allow(clippy::type_complexity)] -pub enum NodeMsg { - GetMasterUri { - reply: oneshot::Sender, - }, - GetClientUri { - reply: oneshot::Sender, - }, - GetSubscriptions { - reply: oneshot::Sender>, - }, - GetPublications { - reply: oneshot::Sender>, - }, - SetPeerPublishers { - topic: String, - publishers: Vec, - }, - // This function exists because "shutdown" is one of the XmlRpc Client APIs that is - // technically part of the ROS ecosystem (never really seen it used) - // This results in the node's task ending and the node being dropped. - Shutdown, - RegisterPublisher { - // Uses Bytes for efficient cloning (reference counted) when there are multiple subscribers - reply: oneshot::Sender, mpsc::Sender<()>), String>>, - topic: String, - topic_type: String, - queue_size: usize, - msg_definition: String, - md5sum: String, - latching: bool, - }, - RegisterSubscriber { - // Uses Bytes for efficient cloning (reference counted) when there are multiple subscribers - reply: oneshot::Sender, String>>, - topic: String, - topic_type: String, - queue_size: usize, - msg_definition: String, - md5sum: String, - }, - RegisterServiceClient { - reply: oneshot::Sender>, - service: Name, - service_type: String, - srv_definition: String, - md5sum: String, - }, - RegisterServiceServer { - reply: oneshot::Sender>, - service: Name, - service_type: String, - srv_definition: String, - server: Box, - md5sum: String, - }, - UnregisterServiceServer { - reply: oneshot::Sender>, - service_name: String, - }, - RequestTopic { - reply: oneshot::Sender>, - topic: String, - protocols: Vec, - }, - UnregisterPublisher { - reply: oneshot::Sender>, - topic: String, - }, -} +use std::{ + collections::HashMap, + io, + net::Ipv4Addr, + sync::{Arc, Weak}, +}; +use tokio::sync::{broadcast, mpsc, Mutex}; +use tokio_util::sync::CancellationToken; /// Represents a communication handle to an underlying node server /// The node server handles all communication with ROS Master and keeps /// track of subscriptions, publishers, etc. -/// Things that need to interact with the node server do so through a command channel -/// Some handles are "root" handles that when dropped also drop the node server. +/// When all NodeServerHandles are dropped, the Node is dropped and cleaned up. #[derive(Clone)] pub(crate) struct NodeServerHandle { - pub(crate) node_server_sender: mpsc::UnboundedSender, - // If this handle should keep the underlying node task alive it will hold an - // Arc to the underlying node task. This is an option because internal handles - // within the node shouldn't keep it alive (e.g. what we hand to xml server) - pub(crate) _node_task: Option>>, + // Shared reference to the actual node + pub(crate) node: Arc>, } impl NodeServerHandle { - /// Get the URI of the master node. - pub(crate) async fn get_master_uri(&self) -> Result { - let (sender, receiver) = oneshot::channel(); - self.node_server_sender - .send(NodeMsg::GetMasterUri { reply: sender })?; - Ok(receiver.await?) - } - /// Get the URI of the client node. pub(crate) async fn get_client_uri(&self) -> Result { - let (sender, receiver) = oneshot::channel(); - self.node_server_sender - .send(NodeMsg::GetClientUri { reply: sender })?; - Ok(receiver.await?) - } - - /// Gets the list of topics the node is currently subscribed to. - /// Returns a tuple of (Topic Name, Topic Type) e.g. ("/rosout", "rosgraph_msgs/Log"). - pub(crate) async fn get_subscriptions(&self) -> Result, NodeError> { - let (sender, receiver) = oneshot::channel(); - self.node_server_sender - .send(NodeMsg::GetSubscriptions { reply: sender })?; - Ok(receiver.await?) - } - - /// Gets the list of topic the node is currently publishing to. - /// Returns a tuple of (Topic Name, Topic Type) e.g. ("/rosout", "rosgraph_msgs/Log"). - pub(crate) async fn get_publications(&self) -> Result, NodeError> { - let (sender, receiver) = oneshot::channel(); - self.node_server_sender - .send(NodeMsg::GetPublications { reply: sender })?; - Ok(receiver.await?) - } - - /// Updates the list of know publishers for a given topic - /// This is used to know who to reach out to for updates - pub(crate) fn set_peer_publishers( - &self, - topic: String, - publishers: Vec, - ) -> Result<(), NodeError> { - Ok(self - .node_server_sender - .send(NodeMsg::SetPeerPublishers { topic, publishers })?) - } - - /// Informs the underlying node server to shutdown - /// This will stop all ROS functionality and poison all NodeHandles connected - /// to the underlying node server. - pub(crate) fn shutdown(&self) -> Result<(), NodeError> { - self.node_server_sender.send(NodeMsg::Shutdown)?; - Ok(()) + let node = self.node.lock().await; + Ok(node.client.client_uri().to_owned()) } /// Registers a publisher with the underlying node server @@ -169,19 +41,19 @@ impl NodeServerHandle { queue_size: usize, latching: bool, ) -> Result<(broadcast::Sender, mpsc::Sender<()>), NodeError> { - let (sender, receiver) = oneshot::channel(); - self.node_server_sender.send(NodeMsg::RegisterPublisher { - reply: sender, - topic: topic.to_owned(), - topic_type: T::ROS_TYPE_NAME.to_owned(), + // Create a weak reference to pass to the publication + let weak_node = Arc::downgrade(&self.node); + let mut node = self.node.lock().await; + node.register_publisher( + topic.to_owned(), + T::ROS_TYPE_NAME, queue_size, - msg_definition: T::DEFINITION.to_owned(), - md5sum: T::MD5SUM.to_owned(), + T::DEFINITION.to_owned(), + T::MD5SUM.to_owned(), latching, - })?; - let received = receiver.await?; - received - .map_err(|_err| NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))) + weak_node, + ) + .await } /// Registers a publisher with the underlying node server @@ -195,8 +67,6 @@ impl NodeServerHandle { queue_size: usize, latching: bool, ) -> Result<(broadcast::Sender, mpsc::Sender<()>), NodeError> { - let (sender, receiver) = oneshot::channel(); - let md5sum_res = roslibrust_common::md5sum::from_message_definition(topic_type, msg_definition); let md5sum = match md5sum_res { @@ -210,31 +80,24 @@ impl NodeServerHandle { Ok(md5sum_rv) => md5sum_rv, }; - self.node_server_sender.send(NodeMsg::RegisterPublisher { - reply: sender, - topic: topic.to_owned(), - topic_type: topic_type.to_owned(), + // Create a weak reference to pass to the publication + let weak_node = Arc::downgrade(&self.node); + let mut node = self.node.lock().await; + node.register_publisher( + topic.to_owned(), + topic_type, queue_size, - msg_definition: msg_definition.to_owned(), + msg_definition.to_owned(), md5sum, latching, - })?; - let received = receiver.await?; - received - .map_err(|_err| NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))) + weak_node, + ) + .await } pub(crate) async fn unregister_publisher(&self, topic: &str) -> Result<(), NodeError> { - let (sender, receiver) = oneshot::channel(); - self.node_server_sender.send(NodeMsg::UnregisterPublisher { - reply: sender, - topic: topic.to_owned(), - })?; - let rx = receiver.await?; - rx.map_err(|err| { - warn!("Failure while unregistering publisher: {err:?}"); - NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted)) - }) + let mut node = self.node.lock().await; + node.unregister_publisher(topic).await } /// Registers a service client with the underlying node server @@ -244,26 +107,22 @@ impl NodeServerHandle { &self, service_name: &Name, ) -> Result, NodeError> { - // Create a channel for hooking into the node server - let (sender, receiver) = oneshot::channel(); - - // Send the request to the node server and see if it accepts it - self.node_server_sender - .send(NodeMsg::RegisterServiceClient { - reply: sender, - service: service_name.to_owned(), - service_type: T::ROS_SERVICE_NAME.to_owned(), - srv_definition: String::from_iter( - [T::Request::DEFINITION, "\n", T::Response::DEFINITION].into_iter(), - ), - md5sum: T::MD5SUM.to_owned(), + let srv_definition = + String::from_iter([T::Request::DEFINITION, "\n", T::Response::DEFINITION].into_iter()); + + let mut node = self.node.lock().await; + let link = node + .register_service_client( + service_name, + T::ROS_SERVICE_NAME, + &srv_definition, + T::MD5SUM, + ) + .await + .map_err(|err| { + log::error!("Failed to register service client: {err}"); + NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted)) })?; - // Get a channel back from the node server for pushing requests into - let received = receiver.await?; - let link = received.map_err(|err| { - log::error!("Failed to register service client: {err}"); - NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted)) - })?; let sender = link.get_sender(); Ok(ServiceClient::new(service_name, sender, link)) @@ -278,8 +137,6 @@ impl NodeServerHandle { T: RosServiceType, F: ServiceFn, { - let (sender, receiver) = oneshot::channel(); - // Type erase the server function here // Here we encode the type information of the service type passed in as T into the closure // This gives a generic closure that operates on byte arrays that we can then store and use freely @@ -294,38 +151,35 @@ impl NodeServerHandle { }; let server_typeless = Box::new(server_typeless); - self.node_server_sender - .send(NodeMsg::RegisterServiceServer { - reply: sender, - service: service_name.to_owned(), - service_type: T::ROS_SERVICE_NAME.to_owned(), - srv_definition: String::from_iter( - [T::Request::DEFINITION, "\n", T::Response::DEFINITION].into_iter(), - ), - server: server_typeless, - md5sum: T::MD5SUM.to_owned(), - })?; - let received = receiver.await?; - received.map_err(|err| { + let srv_definition = + String::from_iter([T::Request::DEFINITION, "\n", T::Response::DEFINITION].into_iter()); + + let mut node = self.node.lock().await; + node.register_service_server( + service_name, + T::ROS_SERVICE_NAME, + &srv_definition, + server_typeless, + T::MD5SUM, + ) + .await + .map_err(|err| { log::error!("Failed to register service server: {err}"); NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted)) }) } /// Called to remove a service server - /// Delegates to the NodeServer via channel + /// Delegates to the NodeServer pub(crate) async fn unadvertise_service(&self, service_name: &str) -> Result<(), NodeError> { - let (tx, rx) = oneshot::channel(); log::debug!("Queuing unregister service server command for: {service_name:?}"); - self.node_server_sender - .send(NodeMsg::UnregisterServiceServer { - reply: tx, - service_name: service_name.to_string(), - })?; - rx.await?.map_err(|e| { - log::error!("Failed to unadvertise service server {service_name:?}: {e:?}"); - NodeError::IoError(io::Error::from(io::ErrorKind::InvalidData)) - }) + let mut node = self.node.lock().await; + node.unregister_service_server(service_name) + .await + .map_err(|e| { + log::error!("Failed to unadvertise service server {service_name:?}: {e:?}"); + NodeError::IoError(io::Error::from(io::ErrorKind::InvalidData)) + }) } /// Registers a subscription with the underlying node server @@ -338,44 +192,18 @@ impl NodeServerHandle { topic: &str, queue_size: usize, ) -> Result, NodeError> { - // Type here is complicated, this is a channel that we're sending a channel receiver over - // This channel is used to fire back the receiver of the underlying subscription - let (sender, receiver) = oneshot::channel(); - self.node_server_sender.send(NodeMsg::RegisterSubscriber { - reply: sender, - topic: topic.to_owned(), - topic_type: T::ROS_TYPE_NAME.to_owned(), + let mut node = self.node.lock().await; + node.register_subscriber( + topic, + T::ROS_TYPE_NAME, queue_size, - msg_definition: T::DEFINITION.to_owned(), - md5sum: T::MD5SUM.to_owned(), - })?; - let received = receiver.await?; - received.map_err(|err| { + T::DEFINITION, + T::MD5SUM, + ) + .await + .map_err(|err| { log::error!("Failed to register subscriber: {err}"); - NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted)) - }) - } - - // This function provides functionality for the Node's XmlRPC server - // When an XmlRpc request for "requestTopic" comes in the xmlrpc server for the node calls this function - // to marshal the response. - // Users can call this function, but it really doesn't serve much of a purpose outside ROS Pub/Sub communication - // negotiation - pub(crate) async fn request_topic( - &self, - topic: &str, - protocols: &[String], - ) -> Result { - let (sender, receiver) = oneshot::channel(); - self.node_server_sender.send(NodeMsg::RequestTopic { - topic: topic.to_owned(), - protocols: protocols.into(), - reply: sender, - })?; - let received = receiver.await?; - received.map_err(|err| { - log::error!("Fail to coordinate channel between publisher and subscriber: {err}"); - NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted)) + err }) } } @@ -387,15 +215,13 @@ impl NodeServerHandle { /// This is sometimes referred to as the NodeServer in the documentation, many NodeHandles can point to one NodeServer pub(crate) struct Node { // The xmlrpc client this node uses to make requests to master - client: MasterClient, - // Server which handles updates from the rosmaster and other ROS nodes - _xmlrpc_server: XmlRpcServerHandle, - // Receiver for requests to the Node actor - node_msg_rx: mpsc::UnboundedReceiver, + pub(crate) client: MasterClient, + // Synchronous version of the client for use in Drop + sync_client: crate::SyncMasterClient, // Map of topic names to the publishing channels associated with the topic - publishers: HashMap, + pub(crate) publishers: HashMap, // Record of subscriptions this node has - subscriptions: HashMap, + pub(crate) subscriptions: HashMap, // Map of topic names to the service client handles for each topic // Note: decision made to not hold a list of service clients here, instead each call // to register_service_client will create a new service client and return a sender to it @@ -404,13 +230,13 @@ pub(crate) struct Node { // This should give better control of how disconnection and lifetimes work for a given client // service_clients: HashMap, // Map of topic names to service server handles for each topic - service_servers: HashMap, - // TODO MAJOR: need signal to shutdown xmlrpc server when node is dropped - host_addr: Ipv4Addr, - hostname: String, - node_name: Name, - // Store a handle to ourself so that we can pass it out later - node_handle: NodeServerHandle, + pub(crate) service_servers: HashMap, + pub(crate) host_addr: Ipv4Addr, + pub(crate) hostname: String, + pub(crate) node_name: Name, + // Cancellation token to signal shutdown of background tasks (e.g., XML-RPC server) + // When this Node is dropped, it will cancel this token + pub(crate) shutdown_token: CancellationToken, } impl Node { @@ -421,238 +247,45 @@ impl Node { node_name: &Name, addr: Ipv4Addr, ) -> Result { - let (node_sender, node_receiver) = mpsc::unbounded_channel(); - let xml_server_handle = NodeServerHandle { - node_server_sender: node_sender.clone(), - // None here because this handle should not keep task alive - _node_task: None, - }; - // Create our xmlrpc server and bind our socket so we know our port and can determine our local URI - let xmlrpc_server = XmlRpcServer::new(addr, xml_server_handle)?; - let client_uri = format!("http://{hostname}:{}", xmlrpc_server.port()); + // Create a cancellation token for shutdown signaling + let shutdown_token = CancellationToken::new(); + // Bind the xmlrpc server first to get the port, but don't start serving yet + let bound_xmlrpc = XmlRpcServer::bind(addr)?; + let client_uri = format!("http://{hostname}:{}", bound_xmlrpc.port()); + + // Create the master client with the correct URI let rosmaster_client = - MasterClient::new(master_uri, client_uri, node_name.to_string()).await?; - let weak_handle = NodeServerHandle { - node_server_sender: node_sender.clone(), - _node_task: None, - }; - let mut node = Self { + MasterClient::new(master_uri, &client_uri, node_name.to_string()).await?; + + // Create a synchronous master client for use in Drop + let sync_client = + crate::SyncMasterClient::new(master_uri, &client_uri, node_name.to_string()); + + // Create the node + let node = Self { client: rosmaster_client, - _xmlrpc_server: xmlrpc_server, - node_msg_rx: node_receiver, - publishers: std::collections::HashMap::new(), - subscriptions: std::collections::HashMap::new(), - service_servers: std::collections::HashMap::new(), + sync_client, + publishers: HashMap::new(), + subscriptions: HashMap::new(), + service_servers: HashMap::new(), host_addr: addr, hostname: hostname.to_owned(), node_name: node_name.to_owned(), - node_handle: weak_handle, + shutdown_token: shutdown_token.clone(), }; - let t = Arc::new( - tokio::spawn(async move { - loop { - match node.node_msg_rx.recv().await { - Some(NodeMsg::Shutdown) => { - log::info!("Shutdown requested, shutting down node"); - break; - } - Some(node_msg) => { - node.handle_msg(node_msg).await; - } - None => { - // This isn't an really expected case? - log::warn!("Node command channel closed, shutting down"); - break; - } - } - } - }) - .into(), - ); + let node_arc = Arc::new(Mutex::new(node)); - let node_server_handle = NodeServerHandle { - node_server_sender: node_sender, - _node_task: Some(t), - }; - Ok(node_server_handle) - } + // Create a weak reference for the xmlrpc server + // This allows the server to access the node without keeping it alive + let weak_node = Arc::downgrade(&node_arc); - async fn handle_msg(&mut self, msg: NodeMsg) { - match msg { - NodeMsg::GetMasterUri { reply } => { - let _ = reply.send(self.client.get_master_uri().to_owned()); - } - NodeMsg::GetClientUri { reply } => { - let _ = reply.send(self.client.client_uri().to_owned()); - } - NodeMsg::GetSubscriptions { reply } => { - let _ = reply.send( - self.subscriptions - .iter() - .map(|(topic_name, subscription)| { - (topic_name.clone(), subscription.topic_type().to_owned()) - }) - .collect(), - ); - } - NodeMsg::GetPublications { reply } => { - let _ = reply.send( - self.publishers - .iter() - .map(|(key, entry)| (key.clone(), entry.topic_type().to_owned())) - .collect(), - ); - } - NodeMsg::SetPeerPublishers { topic, publishers } => { - if let Some(subscription) = self.subscriptions.get_mut(&topic) { - // First, remove any publishers that are no longer in the list - // This cancels retry loops for publishers that rosmaster says are gone - subscription.remove_stale_publishers(&publishers).await; - - // Then add any new publishers - for publisher_uri in publishers { - if let Err(err) = subscription.add_publisher_source(&publisher_uri).await { - log::error!( - "Unable to create subscribe stream for topic {topic}: {err}" - ); - } - } - } else { - log::warn!( - "Got peer publisher update for topic we weren't subscribed to, ignoring" - ); - } - } - NodeMsg::RegisterPublisher { - reply, - topic, - topic_type, - queue_size, - msg_definition, - md5sum, - latching, - } => { - let res = self - .register_publisher( - topic, - &topic_type, - queue_size, - msg_definition, - md5sum, - latching, - ) - .await; - match res { - Ok(handle) => reply.send(Ok(handle)), - Err(err) => reply.send(Err(err.to_string())), - } - .expect("Failed to reply on oneshot"); - } - NodeMsg::UnregisterPublisher { reply, topic } => { - let _ = reply.send( - self.unregister_publisher(&topic) - .await - .map_err(|err| err.to_string()), - ); - } - NodeMsg::RegisterSubscriber { - reply, - topic, - topic_type, - queue_size, - msg_definition, - md5sum, - } => { - let _ = reply.send( - self.register_subscriber( - &topic, - &topic_type, - queue_size, - &msg_definition, - &md5sum, - ) - .await - .map_err(|err| err.to_string()), - ); - } - NodeMsg::RegisterServiceClient { - reply, - service, - service_type, - srv_definition, - md5sum, - } => { - let _ = reply.send( - self.register_service_client(&service, &service_type, &srv_definition, &md5sum) - .await - .map_err(|err| err.to_string()), - ); - } - NodeMsg::RegisterServiceServer { - reply, - service, - service_type, - srv_definition, - server, - md5sum, - } => { - let _ = reply.send( - self.register_service_server( - &service, - &service_type, - &srv_definition, - server, - &md5sum, - ) - .await - .map_err(|err| err.to_string()), - ); - } - NodeMsg::UnregisterServiceServer { - reply, - service_name, - } => { - let _ = reply.send( - self.unregister_service_server(&service_name) - .await - .map_err(|err| err.to_string()), - ); - } - NodeMsg::RequestTopic { - reply, - topic, - protocols, - } => { - // TODO: Should move the actual implementation similar to RegisterPublisher - if protocols.iter().any(|proto| proto.as_str() == "TCPROS") { - if let Some((_key, publishing_channel)) = - self.publishers.iter().find(|(key, _pub)| *key == &topic) - { - let protocol_params = ProtocolParams { - hostname: self.hostname.clone(), - protocol: String::from("TCPROS"), // Hardcoded as the only option for now - port: publishing_channel.port(), - }; - let _ = reply.send(Ok(protocol_params)); - } else { - let err_str = format!("Got request for topic {topic} from subscriber which this node does not publish"); - log::warn!("{err_str}"); - let _ = reply.send(Err(err_str)); - } - } else { - let err_str = format!( - "No supported protocols in the request from the subscriber: {protocols:?}" - ); - log::error!("{err_str}"); - let _ = reply.send(Err(err_str)); - } - } - NodeMsg::Shutdown => { - unreachable!("This node msg is handled in the wrapping handling code"); - } - } + // Now start serving with the properly initialized node + // Pass the weak reference and cancellation token so the server can shut down when signaled + bound_xmlrpc.serve(weak_node, shutdown_token); + + Ok(NodeServerHandle { node: node_arc }) } async fn register_subscriber( @@ -695,6 +328,7 @@ impl Node { msg_definition: String, md5sum: String, latching: bool, + weak_node: Weak>, ) -> Result<(broadcast::Sender, mpsc::Sender<()>), NodeError> { // Return handle to existing Publication if it exists let existing_entry = { @@ -740,7 +374,7 @@ impl Node { &msg_definition, &md5sum, topic_type, - self.node_handle.clone(), + weak_node, ) .await .map_err(|err| { @@ -752,7 +386,7 @@ impl Node { Ok((sender, shutdown)) } - async fn unregister_publisher(&mut self, topic: &str) -> Result<(), NodeError> { + pub(crate) async fn unregister_publisher(&mut self, topic: &str) -> Result<(), NodeError> { // Tell ros master we are no longer publishing this topic let err1 = self.client.unregister_publisher(topic).await; // Remove the publication from our internal state @@ -844,7 +478,7 @@ impl Node { Ok(()) } - async fn unregister_service_server( + pub(crate) async fn unregister_service_server( &mut self, service_name: &str, ) -> Result<(), Box> { @@ -865,46 +499,76 @@ impl Node { // Clears any extant node connections with the ros master // This is not expected to be called anywhere other than the drop impl - fn shutdown(&mut self) { - // Based on this answer: 3b https://stackoverflow.com/questions/71541765/rust-async-drop - // Make copies of what we need to shut down - let client = self.client.clone(); - let subscriptions = std::mem::take(&mut self.subscriptions); - let publishers = std::mem::take(&mut self.publishers); - let service_servers = std::mem::take(&mut self.service_servers); - // Use hostname for unregistering services (must match what was registered) - let hostname = self.hostname.clone(); - - // Move copies into a future that will do the clean-ups - let future = async move { - debug!("Start shutdown node"); - // Note: we're ignoring all failures here and doing best effort cleanup - // Many of these log messages will be incorrect until we get our cleanup logic dialed in. - for topic in subscriptions.keys() { - debug!("Node shutdown is cleaning up subscription: {topic}"); - let _ = client.unregister_subscriber(topic).await.inspect_err(|_e| { - error!("Failed to unregister subscriber for topic: {topic} while shutting down node"); - }); - debug!("CHECK"); + pub(crate) fn shutdown(&mut self) { + debug!("Node shutdown called for: {}", self.node_name); + + // Tell xmlrpc server to shut down first + self.shutdown_token.cancel(); + + // Spawn cleanup in a detached thread to avoid blocking Drop + // This is necessary because synchronous HTTP calls from Drop can cause issues + // when multiple nodes are dropped simultaneously in a tokio runtime + let sync_client = self.sync_client.clone(); + let subscriptions: Vec = self.subscriptions.keys().cloned().collect(); + let publishers: Vec = self.publishers.keys().cloned().collect(); + let service_servers: Vec<(String, String)> = self + .service_servers + .iter() + .map(|(topic, link)| { + let uri = format!("rosrpc://{}:{}", self.hostname, link.port()); + (topic.clone(), uri) + }) + .collect(); + let node_name = self.node_name.to_string(); + + for topic in subscriptions { + debug!("Node shutdown is cleaning up subscription: {topic}"); + match sync_client.unregister_subscriber(&topic) { + Ok(true) => { + debug!("Successfully unregistered subscriber: {topic}"); + } + Ok(false) => { + warn!("Failed to unregister subscriber: {topic}, it was not advertised"); + } + Err(e) => { + error!( + "Failed to unregister subscriber: {topic} while shutting down node: {e}" + ); + } } + } - for topic in publishers.keys() { - debug!("Node shutdown is cleaning up publishing: {topic}"); - let _ = client.unregister_publisher(topic).await.inspect_err(|_e| { - error!("Failed to unregister publisher for topic: {topic} while shutting down node."); - }); + for topic in publishers { + debug!("Node shutdown is cleaning up publishing: {topic}"); + match sync_client.unregister_publisher(&topic) { + Ok(true) => { + debug!("Successfully unregistered publisher: {topic}"); + } + Ok(false) => { + warn!("Failed to unregister publisher: {topic}, it was not advertised"); + } + Err(e) => { + error!("Failed to unregister publisher: {topic} while shutting down node: {e}"); + } } + } - for (topic, service_link) in &service_servers { - debug!("Node shutdown is cleaning up service: {topic}"); - let uri = format!("rosrpc://{}:{}", hostname, service_link.port()); - let _ = client.unregister_service(topic, uri).await.inspect_err(|_e| { - error!("Failed to unregister server server for topic: {topic} while shutting down node."); - }); + for (topic, uri) in service_servers { + debug!("Node shutdown is cleaning up service: {topic}"); + match sync_client.unregister_service(&topic, uri) { + Ok(true) => { + debug!("Successfully unregistered service: {topic}"); + } + Ok(false) => { + warn!("Failed to unregister service: {topic}, it was not advertised"); + } + Err(e) => { + error!("Failed to unregister service: {topic} while shutting down node: {e}"); + } } - }; - // Spawn shutdown operation in a separate task - tokio::spawn(future); + } + + debug!("Node shutdown complete for: {node_name}"); } } diff --git a/roslibrust_ros1/src/node/handle.rs b/roslibrust_ros1/src/node/handle.rs index 5e63e448..43586466 100644 --- a/roslibrust_ros1/src/node/handle.rs +++ b/roslibrust_ros1/src/node/handle.rs @@ -4,6 +4,7 @@ use crate::{ subscriber::Subscriber, subscriber::SubscriberAny, NodeError, ServiceServer, }; use roslibrust_common::ServiceFn; +use std::sync::Arc; /// Represents a handle to an underlying Node. NodeHandle's can be freely cloned, moved, copied, etc. /// This class provides the user facing API for interacting with ROS. @@ -40,16 +41,11 @@ impl NodeHandle { Ok(nh) } - /// This creates a clone() of NodeHandle that doesn't keep the underlying node alive - /// This should be used for things like ServiceServer which wants to be able to talk to the node - /// but doesn't need to keep the node alive. - pub(crate) fn weak_clone(&self) -> NodeHandle { - NodeHandle { - inner: NodeServerHandle { - node_server_sender: self.inner.node_server_sender.clone(), - _node_task: None, - }, - } + // TODO REMOVE BEFORE MERGE + /// Debug helper to check the Arc strong count + /// This is useful for debugging lifecycle issues + pub fn arc_strong_count(&self) -> usize { + Arc::strong_count(&self.inner.node) } /// This function may be removed... @@ -57,7 +53,9 @@ impl NodeHandle { /// If this function returns false, the backend node server has shut down and this handle is invalid. /// This state should be unreachable by normal usage of the library. pub fn is_ok(&self) -> bool { - !self.inner.node_server_sender.is_closed() + // Check if the Arc still has any strong references + // If it only has weak references, the node has been shut down + Arc::strong_count(&self.inner.node) > 0 } /// Returns the network uri of XMLRPC server for the underlying node. @@ -172,8 +170,9 @@ impl NodeHandle { self.inner .register_service_server::(&service_name, server) .await?; - // Super important. Don't clone self or we create a STRONG NodeHandle that keeps the node alive - Ok(ServiceServer::new(service_name, self.weak_clone())) + // Super important: Pass a Weak reference so ServiceServer doesn't keep the node alive + let weak_node = Arc::downgrade(&self.inner.node); + Ok(ServiceServer::new(service_name, weak_node)) } // TODO Major: This should probably be moved to NodeServerHandle? diff --git a/roslibrust_ros1/src/node/xmlrpc.rs b/roslibrust_ros1/src/node/xmlrpc.rs index e1bdd4c7..062d54b6 100644 --- a/roslibrust_ros1/src/node/xmlrpc.rs +++ b/roslibrust_ros1/src/node/xmlrpc.rs @@ -1,11 +1,13 @@ -use super::NodeServerHandle; -use abort_on_drop::ChildTask; +use super::Node; use hyper::{Body, Response, StatusCode}; use log::*; use std::{ convert::Infallible, net::{Ipv4Addr, SocketAddr}, + sync::Weak, }; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; #[allow(unused)] enum RosXmlStatusCode { @@ -32,54 +34,73 @@ impl RosXmlStatusCode { /// but are intentionally using "XmlRpcServer" in place of where ROS says "Slave API" pub(crate) struct XmlRpcServer {} -pub(crate) struct XmlRpcServerHandle { +/// Intermediate structure holding a bound server that hasn't started serving yet +pub(crate) struct BoundXmlRpcServer { port: u16, - _handle: ChildTask<()>, + server: hyper::server::Builder, } -impl XmlRpcServerHandle { +impl BoundXmlRpcServer { + /// Allows getting the port so the node can know its full URI pub fn port(&self) -> u16 { self.port } -} -impl XmlRpcServer { - #[allow(clippy::new_ret_no_self)] - pub fn new( - host_addr: Ipv4Addr, - node_server: NodeServerHandle, - ) -> Result { + /// Start serving requests with a weak reference to the node and cancellation token + /// The server will shutdown when the cancellation token is cancelled or when the node is dropped + pub fn serve(self, weak_node: Weak>, cancellation_token: CancellationToken) { let make_svc = hyper::service::make_service_fn(move |connection| { debug!("New node xmlrpc connection {connection:?}"); - let node_server = node_server.clone(); + let weak_node = weak_node.clone(); async move { Ok::<_, Infallible>(hyper::service::service_fn(move |req| { - XmlRpcServer::respond(node_server.clone(), req) + XmlRpcServer::respond(weak_node.clone(), req) })) } }); - let host_addr = SocketAddr::from((host_addr, 0)); - let server = hyper::server::Server::try_bind(&host_addr)?; - let server = server.serve(make_svc); - let addr = server.local_addr(); - let handle = tokio::spawn(async { - if let Err(err) = server.await { - log::error!("xmlrpc server encountered error: {err:?}"); + let server = self.server.serve(make_svc); + + tokio::spawn(async move { + tokio::select! { + result = server => { + if let Err(err) = result { + log::error!("xmlrpc server encountered error: {err:?}"); + } + } + _ = cancellation_token.cancelled() => { + debug!("XmlRpc server shutting down due to cancellation"); + } } }); + } +} - Ok(XmlRpcServerHandle { - port: addr.port(), - _handle: handle.into(), - }) +impl XmlRpcServer { + /// Bind to an address without starting to serve + /// Returns a BoundXmlRpcServer that can be used to get the port and then start serving + pub fn bind(host_addr: Ipv4Addr) -> Result { + let host_addr = SocketAddr::from((host_addr, 0)); + let server = hyper::server::Server::try_bind(&host_addr)?; + let port = server.local_addr().port(); + + Ok(BoundXmlRpcServer { port, server }) } // Our actual service handler with our error type async fn respond_inner( - node_server: NodeServerHandle, + weak_node: Weak>, body: hyper::Request, ) -> Result, Box>> { + // Try to upgrade the weak reference to a strong reference + // If this fails, the node has been dropped and we should error out + let node_arc = weak_node.upgrade().ok_or_else(|| { + Box::new(Self::make_error_response( + std::io::Error::new(std::io::ErrorKind::NotFound, "Node has been dropped"), + "Node no longer exists, shutting down XmlRpc server", + StatusCode::SERVICE_UNAVAILABLE, + )) + })?; // Await the bytes of the body let body = hyper::body::to_bytes(body).await.map_err(|e| { Box::new(Self::make_error_response( @@ -111,14 +132,9 @@ impl XmlRpcServer { match method_name.as_str() { "getMasterUri" => { debug!("getMasterUri called by {args:?}"); - match node_server.get_master_uri().await { - Ok(uri) => Self::to_response(uri), - Err(e) => Err(Box::new(Self::make_error_response( - e, - "Unable to retrieve master URI", - StatusCode::INTERNAL_SERVER_ERROR, - ))), - } + let node = node_arc.lock().await; + let uri = node.client.get_master_uri().to_owned(); + Self::to_response(uri) } "getPid" => { debug!("getPid called by {args:?}"); @@ -131,30 +147,36 @@ impl XmlRpcServer { } "getSubscriptions" => { debug!("getSubscriptions called by {args:?}"); - match node_server.get_subscriptions().await { - Ok(subs) => { - match serde_xmlrpc::to_value(subs) { - Ok(subs) => Self::to_response(subs), - Err(e) => Err(Box::new(Self::make_error_response( - e, - "Subscriptions contained names which could not be validly serialized to xmlrpc", - StatusCode::INTERNAL_SERVER_ERROR))) - } - }, - Err(e) => Err(Box::new(Self::make_error_response(e, "Unable to get subscriptions", StatusCode::INTERNAL_SERVER_ERROR))) + let node = node_arc.lock().await; + let subs: Vec<(String, String)> = node + .subscriptions + .iter() + .map(|(topic_name, subscription)| { + (topic_name.clone(), subscription.topic_type().to_owned()) + }) + .collect(); + match serde_xmlrpc::to_value(subs) { + Ok(subs) => Self::to_response(subs), + Err(e) => Err(Box::new(Self::make_error_response( + e, + "Subscriptions contained names which could not be validly serialized to xmlrpc", + StatusCode::INTERNAL_SERVER_ERROR))) } } "getPublications" => { debug!("getPublications called by {args:?}"); - match node_server.get_publications().await { - Ok(pubs) => match serde_xmlrpc::to_value(pubs) { - Ok(pubs) => Self::to_response(pubs), - Err(e) => Err(Box::new(Self::make_error_response( - e, - "Publications contained names which could not be validly serialized to xmlrpc", - StatusCode::INTERNAL_SERVER_ERROR))) - }, - Err(e) => Err(Box::new(Self::make_error_response(e, "Unable to get publications", StatusCode::INTERNAL_SERVER_ERROR))) + let node = node_arc.lock().await; + let pubs: Vec<(String, String)> = node + .publishers + .iter() + .map(|(key, entry)| (key.clone(), entry.topic_type().to_owned())) + .collect(); + match serde_xmlrpc::to_value(pubs) { + Ok(pubs) => Self::to_response(pubs), + Err(e) => Err(Box::new(Self::make_error_response( + e, + "Publications contained names which could not be validly serialized to xmlrpc", + StatusCode::INTERNAL_SERVER_ERROR))) } } "paramUpdate" => { @@ -172,15 +194,24 @@ impl XmlRpcServer { StatusCode::BAD_REQUEST, ) })?; - node_server - .set_peer_publishers(topic, publishers) - .map_err(|e| { - Self::make_error_response( - e, - "Unable to set peer publishers", - StatusCode::INTERNAL_SERVER_ERROR, - ) - })?; + + let mut node = node_arc.lock().await; + if let Some(subscription) = node.subscriptions.get_mut(&topic) { + // First, remove any publishers that are no longer in the list + subscription.remove_stale_publishers(&publishers).await; + // Then add any new publishers + for publisher_uri in publishers { + if let Err(err) = subscription.add_publisher_source(&publisher_uri).await { + log::error!( + "Unable to create subscribe stream for topic {topic}: {err}" + ); + } + } + } else { + log::warn!( + "Got peer publisher update for topic we weren't subscribed to, ignoring" + ); + } // ROS's API is for us to still return an int, but the value is literally named "ignore"... Self::to_response(0) @@ -197,16 +228,39 @@ impl XmlRpcServer { })?; let protocols = protocols.iter().flatten().cloned().collect::>(); debug!("Request for topic {topic} from {caller_id} via protocols {protocols:?}"); - let params = node_server - .request_topic(&topic, &protocols) - .await - .map_err(|e| { - Self::make_error_response( - e, - "Unable to get parameters for requested topic", - StatusCode::INTERNAL_SERVER_ERROR, - ) - })?; + + let node = node_arc.lock().await; + let params = if protocols.iter().any(|proto| proto.as_str() == "TCPROS") { + if let Some((_key, publishing_channel)) = + node.publishers.iter().find(|(key, _pub)| *key == &topic) + { + crate::ProtocolParams { + hostname: node.hostname.clone(), + protocol: String::from("TCPROS"), + port: publishing_channel.port(), + } + } else { + return Err(Box::new(Self::make_error_response( + std::io::Error::new( + std::io::ErrorKind::NotFound, + format!( + "Got request for topic {topic} which this node does not publish" + ), + ), + "Topic not found", + StatusCode::NOT_FOUND, + ))); + } + } else { + return Err(Box::new(Self::make_error_response( + std::io::Error::new( + std::io::ErrorKind::Unsupported, + format!("No supported protocols in request: {protocols:?}"), + ), + "Unsupported protocol", + StatusCode::BAD_REQUEST, + ))); + }; let response = Self::make_success_response( RosXmlStatusCode::Success, @@ -229,13 +283,14 @@ impl XmlRpcServer { ) })?; debug!("Received request for shutdown from {caller_id}: {msg}"); - node_server.shutdown().map_err(|e| { - Self::make_error_response( - e, - "Unable to shutdown", - StatusCode::INTERNAL_SERVER_ERROR, - ) - })?; + + // TODO this is not tested, we need to add a test for this + // Trigger shutdown by spawning a task that will lock and shutdown the node + let node_for_shutdown = node_arc.clone(); + tokio::spawn(async move { + let mut node_guard = node_for_shutdown.lock().await; + node_guard.shutdown(); + }); Self::to_response(0) } @@ -319,11 +374,11 @@ impl XmlRpcServer { // Is the actual function we hand to hyper async fn respond( - node_server: NodeServerHandle, + weak_node: Weak>, body: hyper::Request, ) -> Result, Infallible> { // Call our inner function and unwrap error type into response - match Self::respond_inner(node_server, body).await { + match Self::respond_inner(weak_node, body).await { Ok(body) => Ok(body), Err(body) => Ok(*body), } diff --git a/roslibrust_ros1/src/publisher.rs b/roslibrust_ros1/src/publisher.rs index 4d2a49d8..790c6a06 100644 --- a/roslibrust_ros1/src/publisher.rs +++ b/roslibrust_ros1/src/publisher.rs @@ -15,8 +15,6 @@ use tokio::{ sync::broadcast::{self, error::RecvError}, }; -use super::actor::NodeServerHandle; - /// The regular Publisher representation returned by calling advertise on a [crate::NodeHandle]. pub struct Publisher { // Name of the topic this publisher is publishing on @@ -154,7 +152,7 @@ impl Publication { msg_definition: &str, md5sum: &str, topic_type: &str, - node_handle: NodeServerHandle, + weak_node: std::sync::Weak>, ) -> Result< ( Self, @@ -199,7 +197,7 @@ impl Publication { responding_conn_header, receiver, shutdown_rx, - node_handle, + weak_node, ) .await }); @@ -296,7 +294,7 @@ impl Publication { responding_conn_header: ConnectionHeader, // Header we respond with mut rx: broadcast::Receiver, // Receives messages to publish from the main buffer of messages mut shutdown_rx: tokio::sync::mpsc::Receiver<()>, // Channel to signal to the publication to clean itself up - nh: NodeServerHandle, + nh: std::sync::Weak>, ) { debug!("TCP accept task has started for publication: {topic_name}"); // Store latching message as Bytes for cheap cloning when new subscribers connect @@ -309,7 +307,10 @@ impl Publication { None => debug!("TCP accept task has received shutdown signal for publication: {topic_name}"), } // Notify our Node that we're shutting down - nh.unregister_publisher(&topic_name).await.unwrap(); + if let Some(node_arc) = nh.upgrade() { + let mut node = node_arc.lock().await; + let _ = node.unregister_publisher(&topic_name).await; + } // Exit our loop and shutdown this task break; } diff --git a/roslibrust_ros1/src/service_server.rs b/roslibrust_ros1/src/service_server.rs index 740ef4bd..b9945add 100644 --- a/roslibrust_ros1/src/service_server.rs +++ b/roslibrust_ros1/src/service_server.rs @@ -1,15 +1,15 @@ use std::{ net::{Ipv4Addr, SocketAddr}, - sync::Arc, + sync::{Arc, Weak}, }; use abort_on_drop::ChildTask; use log::*; -use tokio::io::AsyncWriteExt; +use tokio::{io::AsyncWriteExt, sync::Mutex}; use crate::tcpros::{self, ConnectionHeader}; -use super::{names::Name, NodeHandle, TypeErasedCallback}; +use super::{names::Name, node::actor::Node, TypeErasedCallback}; /// ServiceServer is simply a lifetime control /// The underlying ServiceServer is kept alive while object is kept alive. @@ -18,14 +18,14 @@ use super::{names::Name, NodeHandle, TypeErasedCallback}; // Maybe we should just let people manually call an unadvertise_service method? pub struct ServiceServer { service_name: Name, - node_handle: NodeHandle, + weak_node: Weak>, } impl ServiceServer { - pub fn new(service_name: Name, node_handle: NodeHandle) -> Self { + pub fn new(service_name: Name, weak_node: Weak>) -> Self { Self { service_name, - node_handle, + weak_node, } } } @@ -33,9 +33,22 @@ impl ServiceServer { impl Drop for ServiceServer { fn drop(&mut self) { debug!("Dropping service server: {:?}", self.service_name); - let _ = self - .node_handle - .unadvertise_service_server(&self.service_name.to_string()); + // Try to upgrade weak reference - if it fails, the node is already gone + if let Some(node_arc) = self.weak_node.upgrade() { + let service_name = self.service_name.to_string(); + // Spawn a task to do the async cleanup + tokio::spawn(async move { + let mut node = node_arc.lock().await; + if let Err(e) = node.unregister_service_server(&service_name).await { + error!("Failed to unregister service server {service_name}: {e:?}"); + } + }); + } else { + debug!( + "Node already dropped, skipping service unadvertisement for {:?}", + self.service_name + ); + } } } From 343af8765f116ba112d9d2f3c22c7015ed939233 Mon Sep 17 00:00:00 2001 From: carter Date: Sat, 16 May 2026 15:39:31 +0000 Subject: [PATCH 2/5] Stopping point for review of internals cleanup --- roslibrust_ros1/src/master_client.rs | 8 +++ roslibrust_ros1/src/node/actor.rs | 97 +++++++++++++++++++++++++-- roslibrust_ros1/src/node/handle.rs | 7 +- roslibrust_ros1/src/node/mod.rs | 2 +- roslibrust_ros1/src/node/xmlrpc.rs | 27 ++++---- roslibrust_ros1/src/publisher.rs | 11 ++- roslibrust_ros1/src/service_server.rs | 29 +++----- 7 files changed, 129 insertions(+), 52 deletions(-) diff --git a/roslibrust_ros1/src/master_client.rs b/roslibrust_ros1/src/master_client.rs index 503f5336..65177f04 100644 --- a/roslibrust_ros1/src/master_client.rs +++ b/roslibrust_ros1/src/master_client.rs @@ -476,6 +476,14 @@ mod test { .await } + fn test_sync_client() -> SyncMasterClient { + SyncMasterClient::new( + "http://localhost:11311", + "http://localhost:11312", + TEST_NODE_ID, + ) + } + #[test_log::test(tokio::test)] async fn get_system_state() -> Result<(), RosMasterError> { let _state = test_client().await?.get_system_state().await?; diff --git a/roslibrust_ros1/src/node/actor.rs b/roslibrust_ros1/src/node/actor.rs index eaf56e58..55bcefb6 100644 --- a/roslibrust_ros1/src/node/actor.rs +++ b/roslibrust_ros1/src/node/actor.rs @@ -10,7 +10,10 @@ use std::{ collections::HashMap, io, net::Ipv4Addr, - sync::{Arc, Weak}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Weak, + }, }; use tokio::sync::{broadcast, mpsc, Mutex}; use tokio_util::sync::CancellationToken; @@ -23,9 +26,78 @@ use tokio_util::sync::CancellationToken; pub(crate) struct NodeServerHandle { // Shared reference to the actual node pub(crate) node: Arc>, + // Shared flag to track if the node is alive + // This is cheaper to check than locking the node + is_alive: Arc, +} + +/// Weak reference to a NodeServerHandle +/// This doesn't keep the Node alive and can be upgraded to a NodeServerHandle +#[derive(Clone)] +pub struct WeakNodeServerHandle { + pub(crate) node: Weak>, + is_alive: Arc, +} + +impl WeakNodeServerHandle { + /// Attempt to upgrade the weak reference to a strong reference + /// Returns None if the Node has already been dropped + pub(crate) fn upgrade(&self) -> Option { + self.node.upgrade().map(|node| NodeServerHandle { + node, + is_alive: self.is_alive.clone(), + }) + } + + /// Attempt to unregister a publisher with the node + /// This is a helper method for use in Drop implementations + /// If the node is already gone, this is a no-op + pub(crate) fn try_unregister_publisher(&self, topic_name: &str) { + if let Some(node_handle) = self.upgrade() { + let topic_name = topic_name.to_string(); + tokio::spawn(async move { + let mut node = node_handle.node.lock().await; + if let Err(e) = node.unregister_publisher(&topic_name).await { + error!("Failed to unregister publisher {topic_name}: {e:?}"); + } + }); + } else { + debug!("Node already dropped, skipping publisher unadvertisement for {topic_name}"); + } + } + + /// Attempt to unregister a service server with the node + /// This is a helper method for use in Drop implementations + /// If the node is already gone, this is a no-op + pub(crate) fn try_unregister_service_server(&self, service_name: &str) { + if let Some(node_handle) = self.upgrade() { + let service_name = service_name.to_string(); + tokio::spawn(async move { + let mut node = node_handle.node.lock().await; + if let Err(e) = node.unregister_service_server(&service_name).await { + error!("Failed to unregister service server {service_name}: {e:?}"); + } + }); + } else { + debug!("Node already dropped, skipping service unadvertisement for {service_name}"); + } + } } impl NodeServerHandle { + /// Create a weak reference to this handle + pub(crate) fn downgrade(&self) -> WeakNodeServerHandle { + WeakNodeServerHandle { + node: Arc::downgrade(&self.node), + is_alive: self.is_alive.clone(), + } + } + + /// Check if the node is still alive + /// This is a cheap operation that doesn't require locking the node + pub(crate) fn is_alive(&self) -> bool { + self.is_alive.load(Ordering::Relaxed) + } /// Get the URI of the client node. pub(crate) async fn get_client_uri(&self) -> Result { let node = self.node.lock().await; @@ -42,7 +114,7 @@ impl NodeServerHandle { latching: bool, ) -> Result<(broadcast::Sender, mpsc::Sender<()>), NodeError> { // Create a weak reference to pass to the publication - let weak_node = Arc::downgrade(&self.node); + let weak_node = self.downgrade(); let mut node = self.node.lock().await; node.register_publisher( topic.to_owned(), @@ -81,7 +153,7 @@ impl NodeServerHandle { }; // Create a weak reference to pass to the publication - let weak_node = Arc::downgrade(&self.node); + let weak_node = self.downgrade(); let mut node = self.node.lock().await; node.register_publisher( topic.to_owned(), @@ -237,6 +309,10 @@ pub(crate) struct Node { // Cancellation token to signal shutdown of background tasks (e.g., XML-RPC server) // When this Node is dropped, it will cancel this token pub(crate) shutdown_token: CancellationToken, + // Atomic flag to track if the node is alive + // Set to true in construction, false in shutdown + // This is shared across all handles to the node + is_alive: Arc, } impl Node { @@ -263,6 +339,7 @@ impl Node { crate::SyncMasterClient::new(master_uri, &client_uri, node_name.to_string()); // Create the node + let is_alive = Arc::new(AtomicBool::new(true)); let node = Self { client: rosmaster_client, sync_client, @@ -273,19 +350,24 @@ impl Node { hostname: hostname.to_owned(), node_name: node_name.to_owned(), shutdown_token: shutdown_token.clone(), + is_alive: is_alive.clone(), }; let node_arc = Arc::new(Mutex::new(node)); + let node_handle = NodeServerHandle { + node: node_arc, + is_alive, + }; // Create a weak reference for the xmlrpc server // This allows the server to access the node without keeping it alive - let weak_node = Arc::downgrade(&node_arc); + let weak_node = node_handle.downgrade(); // Now start serving with the properly initialized node // Pass the weak reference and cancellation token so the server can shut down when signaled bound_xmlrpc.serve(weak_node, shutdown_token); - Ok(NodeServerHandle { node: node_arc }) + Ok(node_handle) } async fn register_subscriber( @@ -328,7 +410,7 @@ impl Node { msg_definition: String, md5sum: String, latching: bool, - weak_node: Weak>, + weak_node: WeakNodeServerHandle, ) -> Result<(broadcast::Sender, mpsc::Sender<()>), NodeError> { // Return handle to existing Publication if it exists let existing_entry = { @@ -502,6 +584,9 @@ impl Node { pub(crate) fn shutdown(&mut self) { debug!("Node shutdown called for: {}", self.node_name); + // Mark the node as no longer alive + self.is_alive.store(false, Ordering::Relaxed); + // Tell xmlrpc server to shut down first self.shutdown_token.cancel(); diff --git a/roslibrust_ros1/src/node/handle.rs b/roslibrust_ros1/src/node/handle.rs index 43586466..60266ca3 100644 --- a/roslibrust_ros1/src/node/handle.rs +++ b/roslibrust_ros1/src/node/handle.rs @@ -53,9 +53,8 @@ impl NodeHandle { /// If this function returns false, the backend node server has shut down and this handle is invalid. /// This state should be unreachable by normal usage of the library. pub fn is_ok(&self) -> bool { - // Check if the Arc still has any strong references - // If it only has weak references, the node has been shut down - Arc::strong_count(&self.inner.node) > 0 + // Check the atomic is_alive flag which is set to false during shutdown + self.inner.is_alive() } /// Returns the network uri of XMLRPC server for the underlying node. @@ -171,7 +170,7 @@ impl NodeHandle { .register_service_server::(&service_name, server) .await?; // Super important: Pass a Weak reference so ServiceServer doesn't keep the node alive - let weak_node = Arc::downgrade(&self.inner.node); + let weak_node = self.inner.downgrade(); Ok(ServiceServer::new(service_name, weak_node)) } diff --git a/roslibrust_ros1/src/node/mod.rs b/roslibrust_ros1/src/node/mod.rs index bf8079b1..08f0a1d4 100644 --- a/roslibrust_ros1/src/node/mod.rs +++ b/roslibrust_ros1/src/node/mod.rs @@ -13,7 +13,7 @@ use std::{ pub(crate) mod actor; mod handle; mod xmlrpc; -use actor::*; +pub use actor::WeakNodeServerHandle; use anyhow::anyhow; pub use handle::NodeHandle; use tokio::sync::{mpsc, oneshot}; diff --git a/roslibrust_ros1/src/node/xmlrpc.rs b/roslibrust_ros1/src/node/xmlrpc.rs index 062d54b6..1464fa9f 100644 --- a/roslibrust_ros1/src/node/xmlrpc.rs +++ b/roslibrust_ros1/src/node/xmlrpc.rs @@ -1,12 +1,9 @@ -use super::Node; use hyper::{Body, Response, StatusCode}; use log::*; use std::{ convert::Infallible, net::{Ipv4Addr, SocketAddr}, - sync::Weak, }; -use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; #[allow(unused)] @@ -48,7 +45,11 @@ impl BoundXmlRpcServer { /// Start serving requests with a weak reference to the node and cancellation token /// The server will shutdown when the cancellation token is cancelled or when the node is dropped - pub fn serve(self, weak_node: Weak>, cancellation_token: CancellationToken) { + pub fn serve( + self, + weak_node: super::actor::WeakNodeServerHandle, + cancellation_token: CancellationToken, + ) { let make_svc = hyper::service::make_service_fn(move |connection| { debug!("New node xmlrpc connection {connection:?}"); let weak_node = weak_node.clone(); @@ -89,12 +90,12 @@ impl XmlRpcServer { // Our actual service handler with our error type async fn respond_inner( - weak_node: Weak>, + weak_node: super::actor::WeakNodeServerHandle, body: hyper::Request, ) -> Result, Box>> { // Try to upgrade the weak reference to a strong reference // If this fails, the node has been dropped and we should error out - let node_arc = weak_node.upgrade().ok_or_else(|| { + let node_handle = weak_node.upgrade().ok_or_else(|| { Box::new(Self::make_error_response( std::io::Error::new(std::io::ErrorKind::NotFound, "Node has been dropped"), "Node no longer exists, shutting down XmlRpc server", @@ -132,7 +133,7 @@ impl XmlRpcServer { match method_name.as_str() { "getMasterUri" => { debug!("getMasterUri called by {args:?}"); - let node = node_arc.lock().await; + let node = node_handle.node.lock().await; let uri = node.client.get_master_uri().to_owned(); Self::to_response(uri) } @@ -147,7 +148,7 @@ impl XmlRpcServer { } "getSubscriptions" => { debug!("getSubscriptions called by {args:?}"); - let node = node_arc.lock().await; + let node = node_handle.node.lock().await; let subs: Vec<(String, String)> = node .subscriptions .iter() @@ -165,7 +166,7 @@ impl XmlRpcServer { } "getPublications" => { debug!("getPublications called by {args:?}"); - let node = node_arc.lock().await; + let node = node_handle.node.lock().await; let pubs: Vec<(String, String)> = node .publishers .iter() @@ -195,7 +196,7 @@ impl XmlRpcServer { ) })?; - let mut node = node_arc.lock().await; + let mut node = node_handle.node.lock().await; if let Some(subscription) = node.subscriptions.get_mut(&topic) { // First, remove any publishers that are no longer in the list subscription.remove_stale_publishers(&publishers).await; @@ -229,7 +230,7 @@ impl XmlRpcServer { let protocols = protocols.iter().flatten().cloned().collect::>(); debug!("Request for topic {topic} from {caller_id} via protocols {protocols:?}"); - let node = node_arc.lock().await; + let node = node_handle.node.lock().await; let params = if protocols.iter().any(|proto| proto.as_str() == "TCPROS") { if let Some((_key, publishing_channel)) = node.publishers.iter().find(|(key, _pub)| *key == &topic) @@ -286,7 +287,7 @@ impl XmlRpcServer { // TODO this is not tested, we need to add a test for this // Trigger shutdown by spawning a task that will lock and shutdown the node - let node_for_shutdown = node_arc.clone(); + let node_for_shutdown = node_handle.node.clone(); tokio::spawn(async move { let mut node_guard = node_for_shutdown.lock().await; node_guard.shutdown(); @@ -374,7 +375,7 @@ impl XmlRpcServer { // Is the actual function we hand to hyper async fn respond( - weak_node: Weak>, + weak_node: super::actor::WeakNodeServerHandle, body: hyper::Request, ) -> Result, Infallible> { // Call our inner function and unwrap error type into response diff --git a/roslibrust_ros1/src/publisher.rs b/roslibrust_ros1/src/publisher.rs index 790c6a06..017ce0b5 100644 --- a/roslibrust_ros1/src/publisher.rs +++ b/roslibrust_ros1/src/publisher.rs @@ -152,7 +152,7 @@ impl Publication { msg_definition: &str, md5sum: &str, topic_type: &str, - weak_node: std::sync::Weak>, + weak_node: crate::node::actor::WeakNodeServerHandle, ) -> Result< ( Self, @@ -294,7 +294,7 @@ impl Publication { responding_conn_header: ConnectionHeader, // Header we respond with mut rx: broadcast::Receiver, // Receives messages to publish from the main buffer of messages mut shutdown_rx: tokio::sync::mpsc::Receiver<()>, // Channel to signal to the publication to clean itself up - nh: std::sync::Weak>, + nh: crate::node::actor::WeakNodeServerHandle, ) { debug!("TCP accept task has started for publication: {topic_name}"); // Store latching message as Bytes for cheap cloning when new subscribers connect @@ -306,11 +306,8 @@ impl Publication { Some(_) => error!("Message should never be sent on this channel"), None => debug!("TCP accept task has received shutdown signal for publication: {topic_name}"), } - // Notify our Node that we're shutting down - if let Some(node_arc) = nh.upgrade() { - let mut node = node_arc.lock().await; - let _ = node.unregister_publisher(&topic_name).await; - } + // Notify our Node that we're shutting down using the unified helper method + nh.try_unregister_publisher(&topic_name); // Exit our loop and shutdown this task break; } diff --git a/roslibrust_ros1/src/service_server.rs b/roslibrust_ros1/src/service_server.rs index b9945add..b7d70d06 100644 --- a/roslibrust_ros1/src/service_server.rs +++ b/roslibrust_ros1/src/service_server.rs @@ -1,15 +1,15 @@ use std::{ net::{Ipv4Addr, SocketAddr}, - sync::{Arc, Weak}, + sync::Arc, }; use abort_on_drop::ChildTask; use log::*; -use tokio::{io::AsyncWriteExt, sync::Mutex}; +use tokio::io::AsyncWriteExt; use crate::tcpros::{self, ConnectionHeader}; -use super::{names::Name, node::actor::Node, TypeErasedCallback}; +use super::{names::Name, node::actor::WeakNodeServerHandle, TypeErasedCallback}; /// ServiceServer is simply a lifetime control /// The underlying ServiceServer is kept alive while object is kept alive. @@ -18,11 +18,11 @@ use super::{names::Name, node::actor::Node, TypeErasedCallback}; // Maybe we should just let people manually call an unadvertise_service method? pub struct ServiceServer { service_name: Name, - weak_node: Weak>, + weak_node: WeakNodeServerHandle, } impl ServiceServer { - pub fn new(service_name: Name, weak_node: Weak>) -> Self { + pub fn new(service_name: Name, weak_node: WeakNodeServerHandle) -> Self { Self { service_name, weak_node, @@ -33,22 +33,9 @@ impl ServiceServer { impl Drop for ServiceServer { fn drop(&mut self) { debug!("Dropping service server: {:?}", self.service_name); - // Try to upgrade weak reference - if it fails, the node is already gone - if let Some(node_arc) = self.weak_node.upgrade() { - let service_name = self.service_name.to_string(); - // Spawn a task to do the async cleanup - tokio::spawn(async move { - let mut node = node_arc.lock().await; - if let Err(e) = node.unregister_service_server(&service_name).await { - error!("Failed to unregister service server {service_name}: {e:?}"); - } - }); - } else { - debug!( - "Node already dropped, skipping service unadvertisement for {:?}", - self.service_name - ); - } + // Use the unified helper method to unregister the service server + self.weak_node + .try_unregister_service_server(&self.service_name.to_string()); } } From ddd267686fe4daec133ca72c2bab2830ef4bdc6b Mon Sep 17 00:00:00 2001 From: carter Date: Sat, 16 May 2026 11:15:24 -0600 Subject: [PATCH 3/5] Clean up testing for ros1 --- .../tests/ros1_native_integration_tests.rs | 593 ------------------ .../tests/ros1_integration_tests.rs | 553 ++++++++++++++++ .../tests/ros1_subcriber_publisher_any.rs | 72 --- roslibrust_rosapi/src/lib.rs | 3 +- 4 files changed, 554 insertions(+), 667 deletions(-) delete mode 100644 roslibrust/tests/ros1_native_integration_tests.rs create mode 100644 roslibrust_ros1/tests/ros1_integration_tests.rs delete mode 100644 roslibrust_ros1/tests/ros1_subcriber_publisher_any.rs diff --git a/roslibrust/tests/ros1_native_integration_tests.rs b/roslibrust/tests/ros1_native_integration_tests.rs deleted file mode 100644 index 9586dba4..00000000 --- a/roslibrust/tests/ros1_native_integration_tests.rs +++ /dev/null @@ -1,593 +0,0 @@ -//! This test file is intended to contain all integration tests of ROS1 native fuctionality. -//! Any test which interacts with actual running ros nodes should be in this file. - -#[cfg(all(feature = "ros1_test", feature = "ros1", feature = "rosbridge"))] -mod tests { - use log::*; - use roslibrust::ros1::{NodeError, NodeHandle}; - use roslibrust::rosbridge::ClientHandle; - use tokio::time::timeout; - - roslibrust_codegen_macro::find_and_generate_ros_messages!( - "assets/ros1_test_msgs", - "assets/ros1_common_interfaces" - ); - - #[test_log::test(tokio::test)] - async fn test_publish_any() { - // publish a single message in raw bytes and test the received message is as expected - let nh = NodeHandle::new("http://localhost:11311", "test_publish_any") - .await - .unwrap(); - - let publisher = nh - .advertise_any( - "/test_publish_any", - "std_msgs/String", - "string data\n", - 1, - true, - ) - .await - .unwrap(); - - let mut subscriber = nh - .subscribe::("/test_publish_any", 1) - .await - .unwrap(); - - let msg_raw: Vec = [8, 0, 0, 0, 4, 0, 0, 0, 116, 101, 115, 116].to_vec(); - publisher.publish(msg_raw).await.unwrap(); - - let res = - tokio::time::timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await; - let msg = res.unwrap().unwrap().unwrap(); - assert_eq!(msg.data, "test"); - } - - #[test_log::test(tokio::test)] - async fn test_subscribe_any() { - // get a single message in raw bytes and test the bytes are as expected - let nh = NodeHandle::new("http://localhost:11311", "test_subscribe_any") - .await - .unwrap(); - - let publisher = nh - .advertise::("/test_subscribe_any", 1, true) - .await - .unwrap(); - - let mut subscriber = nh.subscribe_any("/test_subscribe_any", 1).await.unwrap(); - - publisher - .publish(&std_msgs::String { - data: "test".to_owned(), - }) - .await - .unwrap(); - - let res = - tokio::time::timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await; - let res = res.unwrap().unwrap().unwrap(); - assert!(res == vec![8, 0, 0, 0, 4, 0, 0, 0, 116, 101, 115, 116]); - } - - #[test_log::test(tokio::test)] - async fn test_latching() { - let nh = NodeHandle::new("http://localhost:11311", "test_latching") - .await - .unwrap(); - - // Create a publisher that is latching - let publisher = nh - .advertise::("/test_latching", 1, true) - .await - .unwrap(); - - // Publish message to no one - publisher - .publish(&std_msgs::String { - data: "test".to_owned(), - }) - .await - .unwrap(); - - // Create a subscriber that will connect to the publisher - let mut subscriber = nh - .subscribe::("/test_latching", 1) - .await - .unwrap(); - - // Try to get message from subscriber - let msg = subscriber.next().await.unwrap().unwrap(); - - // Confirm we got the message we published - assert_eq!(msg.data, "test"); - } - - #[test_log::test(tokio::test)] - async fn test_not_latching() { - // Opposite of test_latching, confirms no message appears when latching is false - let nh = NodeHandle::new("http://localhost:11311", "test_not_latching") - .await - .unwrap(); - - let publisher = nh - .advertise::("/test_not_latching", 1, false) - .await - .unwrap(); - - publisher - .publish(&std_msgs::String { - data: "test".to_owned(), - }) - .await - .unwrap(); - - let mut subscriber = nh - .subscribe::("/test_not_latching", 1) - .await - .unwrap(); - - let res = - tokio::time::timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await; - // Should timeout - assert!(res.is_err()); - } - - #[test_log::test(tokio::test)] - async fn test_large_payload_subscriber() { - let nh = NodeHandle::new("http://localhost:11311", "/test_large_payload_subscriber") - .await - .unwrap(); - - let publisher = nh - .advertise::("/large_payload_topic", 1, false) - .await - .unwrap(); - - let mut subscriber = nh - .subscribe::("/large_payload_topic", 1) - .await - .unwrap(); - - // Give some time for subscriber to connect to publisher - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - for _i in 0..10 { - let bytes = vec![0; 10_000]; - publisher - .publish(&test_msgs::RoundTripArrayRequest { - bytes: bytes.clone(), - }) - .await - .unwrap(); - - match subscriber.next().await { - Some(Ok(msg)) => { - assert_eq!(msg.bytes, bytes); - } - Some(Err(e)) => { - panic!("Got error: {e:?}"); - } - None => { - panic!("Got None"); - } - } - } - } - - #[test_log::test(tokio::test)] - async fn test_large_service_payload_client() { - let nh = NodeHandle::new( - "http://localhost:11311", - "test_large_service_payload_client", - ) - .await - .unwrap(); - - // Advertise a service that just echo's the bytes back - let _handle = nh - .advertise_service::("large_service_payload", |request| { - Ok(test_msgs::RoundTripArrayResponse { - bytes: request.bytes, - }) - }) - .await - .unwrap(); - - // Picking random value that should be larger than MTU - // Making sure the ROS message gets split over multiple TCP transactions - // and that we correctly re-assemble it on the other end - let bytes = vec![0; 10_000]; - - info!("Starting service call"); - let response = nh - .service_client::("large_service_payload") - .await - .unwrap() - .call(&test_msgs::RoundTripArrayRequest { - bytes: bytes.clone(), - }) - .await - .unwrap(); - info!("Service call complete"); - - assert_eq!(response.bytes, bytes); - } - - #[test_log::test(tokio::test)] - async fn error_on_unprovided_service() { - let nh = NodeHandle::new("http://localhost:11311", "error_on_unprovided_service") - .await - .unwrap(); - - let client = nh - .service_client::("unprovided_service") - .await; - assert!(client.is_err()); - // Note / TODO: this currently returns an IoError(Kind(ConnectionAborted)) - // which is better than hanging, but not a good error type to return - if !matches!(client, Err(NodeError::IoError(_))) { - panic!("Unexpected error type"); - } - } - - #[test_log::test(tokio::test)] - async fn persistent_client_can_be_called_multiple_times() { - let nh = NodeHandle::new( - "http://localhost:11311", - "/persistent_client_can_be_called_multiple_times", - ) - .await - .unwrap(); - - let server_fn = |request: test_msgs::AddTwoIntsRequest| { - Ok(test_msgs::AddTwoIntsResponse { - sum: request.a + request.b, - }) - }; - - let _handle = nh - .advertise_service::( - "/persistent_client_can_be_called_multiple_times/add_two", - server_fn, - ) - .await - .unwrap(); - - let client = nh - .service_client::( - "/persistent_client_can_be_called_multiple_times/add_two", - ) - .await - .unwrap(); - - for i in 0..10 { - let call: test_msgs::AddTwoIntsResponse = client - .call(&test_msgs::AddTwoIntsRequest { a: 1, b: i }) - .await - .unwrap(); - - assert_eq!(call.sum, 1 + i); - } - } - - #[test_log::test(tokio::test)] - async fn basic_service_server() { - const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); - debug!("Getting node handle"); - let nh = NodeHandle::new("http://localhost:11311", "/basic_service_server") - .await - .unwrap(); - - let server_fn = |request: test_msgs::AddTwoIntsRequest| { - info!("Got request: {request:?}"); - Ok(test_msgs::AddTwoIntsResponse { - sum: request.a + request.b, - }) - }; - - // Create the server - debug!("Creating server"); - let _handle = nh - .advertise_service::( - "/basic_service_server/add_two", - server_fn, - ) - .await - .unwrap(); - - // Make the request - debug!("Calling service"); - let call: test_msgs::AddTwoIntsResponse = timeout( - TIMEOUT, - timeout( - TIMEOUT, - nh.service_client::("/basic_service_server/add_two"), - ) - .await - .unwrap() - .unwrap() - .call(&test_msgs::AddTwoIntsRequest { a: 1, b: 2 }), - ) - .await - .unwrap() - .unwrap(); - - assert_eq!(call.sum, 3); - debug!("Got 3"); - } - - #[test_log::test(tokio::test)] - async fn dropping_service_server_kill_correctly() { - debug!("Getting node handle"); - let nh = NodeHandle::new("http://localhost:11311", "/dropping_service_node") - .await - .unwrap(); - - let server_fn = |request: test_msgs::AddTwoIntsRequest| { - info!("Got request: {request:?}"); - Ok(test_msgs::AddTwoIntsResponse { - sum: request.a + request.b, - }) - }; - - // Create the server - let handle = nh - .advertise_service::( - "/dropping_service_node/add_two", - server_fn, - ) - .await - .unwrap(); - - // Make the request (should succeed) - let client = nh - .service_client::("/dropping_service_node/add_two") - .await - .unwrap(); - let _call: test_msgs::AddTwoIntsResponse = client - .call(&test_msgs::AddTwoIntsRequest { a: 1, b: 2 }) - .await - .unwrap(); - - // Shut down the server - std::mem::drop(handle); - // Wait a little bit for server shut down to process - tokio::time::sleep(std::time::Duration::from_millis(250)).await; - - // Make the request again (should fail) - let call_2 = client - .call(&test_msgs::AddTwoIntsRequest { a: 1, b: 2 }) - .await; - debug!("Got call_2: {call_2:?}"); - assert!( - call_2.is_err(), - "Shouldn't be able to call after server is shut down" - ); - - // Create a new clinet - let client = nh - .service_client::("/dropping_service_node/add_two") - .await; - // Client should fail to create as there should be no provider of the service - assert!( - client.is_err(), - "Shouldn't be able to connect again (no provider of service)" - ); - - // Confirm ros master no longer reports our service as provided (via rosapi for fun) - let rosapi_client = nh - .service_client::("/rosapi/services") - .await - .unwrap(); - let service_list: rosapi::ServicesResponse = rosapi_client - .call(&rosapi::ServicesRequest {}) - .await - .unwrap(); - assert!(!service_list - .services - .contains(&"/dropping_service_node/add_two".to_string())); - } - - #[test_log::test(tokio::test)] - async fn service_error_behavior() { - debug!("Getting node handle"); - let nh = NodeHandle::new("http://localhost:11311", "/service_error_behavior") - .await - .unwrap(); - - let server_fn = |request| { - info!("Got request: {request:?}"); - Err(std::io::Error::new(std::io::ErrorKind::NotFound, "test message").into()) - }; - - // Create the server - let _handle = nh - .advertise_service::( - "/service_error_behavior/add_two", - server_fn, - ) - .await - .unwrap(); - - // Make the request (should fail) - let client = nh - .service_client::("/service_error_behavior/add_two") - .await - .unwrap(); - let call = client - .call(&test_msgs::AddTwoIntsRequest { a: 1, b: 2 }) - .await; - // Okay so this is logging the error message correctly, but the contents currently suck: - // "Got call: Err(IoError(Custom { kind: Other, error: "Failure response from service server: Custom { kind: NotFound, error: \"test message\" }" }))" - // We should someday clean up error types here, but frankly errors throughout the entire crate need an overhaul - debug!("Got call: {call:?}"); - assert!(call.is_err()); - } - - #[test_log::test(tokio::test)] - async fn test_dropping_publisher_unadvertises() { - let nh = NodeHandle::new("http://localhost:11311", "/test_dropping_publisher") - .await - .unwrap(); - let publisher = nh - .advertise::("/test_dropping_publisher", 1, false) - .await - .unwrap(); - - let master_client = roslibrust_ros1::MasterClient::new( - "http://localhost:11311", - "NAN", - "/test_dropping_publisher_mc", - ) - .await - .unwrap(); - - let before = master_client.get_published_topics("").await.unwrap(); - debug!("Published topics: {before:?}"); - - assert!(before.contains(&( - "/test_dropping_publisher".to_string(), - "std_msgs/Header".to_string() - ))); - - debug!("Start manual drop"); - // Drop the publisher - std::mem::drop(publisher); - debug!("End manual drop"); - // Give a little time for drop to process - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - // Confirm no longer advertised - let after = master_client.get_published_topics("").await.unwrap(); - assert!(!after.contains(&( - "/test_dropping_publisher".to_string(), - "std_msgs/Header".to_string() - ))); - } - - #[test_log::test(tokio::test)] - #[ntest::timeout(6000)] - #[cfg(feature = "ros1_test")] - async fn topic_provider_publish_functionality_test() { - use roslibrust_common::*; - - // Define a custom "Node" - struct MyClient { - _client: T, - } - - impl MyClient { - async fn test_main( - ros: &T, - msg: &str, - ) -> std::result::Result<(), Box> { - // In the body we'll publish a message - let publisher = ros - .advertise::("/topic_provider_func_test") - .await?; - // Give some time for subscriber to connect in - tokio::time::sleep(std::time::Duration::from_millis(250)).await; - publisher - .publish(&std_msgs::String { data: msg.into() }) - .await?; - // Give some time for publish to process out - tokio::time::sleep(std::time::Duration::from_millis(250)).await; - Ok(()) - } - } - - // Create a ros1 subscriber on our topic - let nh = NodeHandle::new( - "http://localhost:11311", - "/topic_provider_func_test_listener", - ) - .await - .unwrap(); - let mut sub = nh - .subscribe::("/topic_provider_func_test", 1) - .await - .unwrap(); - - // Use our generic node with ROS1 to publish - let nh_clone = nh.clone(); - tokio::spawn(async move { - MyClient::::test_main(&nh_clone, "hello world from ros1").await - }); - - // Confirm we got the message - let msg = sub.next().await.unwrap().unwrap(); - assert_eq!(msg.data, "hello world from ros1"); - - // Use our generic node with rosbridge to publish - tokio::spawn(async move { - MyClient::::test_main( - &ClientHandle::new("ws://localhost:9090").await.unwrap(), - "hello world from rosbridge", - ) - .await - }); - - // Confirm we got the message - let msg = sub.next().await.unwrap().unwrap(); - assert_eq!(msg.data, "hello world from rosbridge"); - } - - /// Test that we correctly purge references to publishers, subscribers and services servers when a node shuts down - #[test_log::test(tokio::test)] - async fn node_cleanup() { - // Create our node - // this nh controls the lifetimes - let nh = NodeHandle::new("http://localhost:11311", "/test_node_cleanup") - .await - .unwrap(); - - // Create pub, sub, and service server to prove all get cleaned up - let _publisher = nh - .advertise::("/test_cleanup_pub", 1, false) - .await - .unwrap(); - - let _subscriber = nh - .subscribe::("/test_cleanup_sub", 1) - .await - .unwrap(); - - let _service_server = nh - .advertise_service::("/test_cleanup_srv", |_req| { - Ok(Default::default()) - }) - .await - .unwrap(); - - let master_client = roslibrust_ros1::MasterClient::new( - "http://localhost:11311", - "NAN", - "/test_node_cleanup_checker", - ) - .await - .unwrap(); - - let data = master_client.get_system_state().await.unwrap(); - info!("Got data before drop: {data:?}"); - - // Check that our three connections are reported by the ros master before starting - assert!(data.is_publishing("/test_cleanup_pub", "/test_node_cleanup")); - assert!(data.is_subscribed("/test_cleanup_sub", "/test_node_cleanup")); - assert!(data.is_service_provider("/test_cleanup_srv", "/test_node_cleanup")); - - // Drop our node handle - std::mem::drop(nh); - debug!("Drop has happened"); - - // Confirm here that Node actually got shut down - let data = master_client.get_system_state().await.unwrap(); - info!("Got data after drop: {data:?}"); - - // Check that our three connections are no longer reported by the ros master after dropping - assert!(!data.is_publishing("/test_cleanup_pub", "/test_node_cleanup")); - assert!(!data.is_subscribed("/test_cleanup_sub", "/test_node_cleanup")); - assert!(!data.is_service_provider("/test_cleanup_srv", "/test_node_cleanup")); - } -} diff --git a/roslibrust_ros1/tests/ros1_integration_tests.rs b/roslibrust_ros1/tests/ros1_integration_tests.rs new file mode 100644 index 00000000..1708a33b --- /dev/null +++ b/roslibrust_ros1/tests/ros1_integration_tests.rs @@ -0,0 +1,553 @@ +//! Comprehensive integration tests for ROS1 functionality +//! +//! These tests require a running ROS master at localhost:11311 +//! Run with: cargo test --package roslibrust_ros1 --features ros1_test + +#[cfg(feature = "ros1_test")] +mod tests { + use roslibrust_ros1::{MasterClient, NodeHandle}; + use roslibrust_test::ros1::{std_msgs, std_srvs}; + use tokio::time::timeout; + + // ============================================================================ + // SECTION: Publisher/Subscriber Tests + // ============================================================================ + + #[test_log::test(tokio::test)] + async fn test_publish_subscribe_typed() { + let nh = NodeHandle::new("http://localhost:11311", "/test_pub_sub_typed") + .await + .unwrap(); + + let publisher = nh + .advertise::("/test_pub_sub_typed_topic", 1, false) + .await + .unwrap(); + + let mut subscriber = nh + .subscribe::("/test_pub_sub_typed_topic", 1) + .await + .unwrap(); + + // Give time for connection + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + publisher + .publish(&std_msgs::String { + data: "hello world".to_owned(), + }) + .await + .unwrap(); + + let res = timeout(tokio::time::Duration::from_millis(500), subscriber.next()).await; + let msg = res.unwrap().unwrap().unwrap(); + assert_eq!(msg.data, "hello world"); + } + + #[test_log::test(tokio::test)] + async fn test_publish_any() { + let nh = NodeHandle::new("http://localhost:11311", "/test_publish_any") + .await + .unwrap(); + + let publisher = nh + .advertise_any( + "/test_publish_any_topic", + "std_msgs/String", + "string data\n", + 1, + true, + ) + .await + .unwrap(); + + let mut subscriber = nh + .subscribe::("/test_publish_any_topic", 1) + .await + .unwrap(); + + // Message body for "test" (without overall length header) + let msg_raw: Vec = vec![8, 0, 0, 0, 4, 0, 0, 0, 116, 101, 115, 116]; + publisher.publish(msg_raw).await.unwrap(); + + let res = timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await; + let msg = res.unwrap().unwrap().unwrap(); + assert_eq!(msg.data, "test"); + } + + #[test_log::test(tokio::test)] + async fn test_subscribe_any() { + let nh = NodeHandle::new("http://localhost:11311", "/test_subscribe_any") + .await + .unwrap(); + + let publisher = nh + .advertise::("/test_subscribe_any_topic", 1, true) + .await + .unwrap(); + + let mut subscriber = nh + .subscribe_any("/test_subscribe_any_topic", 1) + .await + .unwrap(); + + publisher + .publish(&std_msgs::String { + data: "test".to_owned(), + }) + .await + .unwrap(); + + let res = timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await; + let received = res.unwrap().unwrap().unwrap(); + // Expected: [field_length, "test"] + assert_eq!(received, vec![8, 0, 0, 0, 4, 0, 0, 0, 116, 101, 115, 116]); + } + + #[test_log::test(tokio::test)] + async fn test_latching_publisher() { + let nh = NodeHandle::new("http://localhost:11311", "/test_latching") + .await + .unwrap(); + + // Create a latching publisher + let publisher = nh + .advertise::("/test_latching_topic", 1, true) + .await + .unwrap(); + + // Publish message before subscriber exists + publisher + .publish(&std_msgs::String { + data: "latched message".to_owned(), + }) + .await + .unwrap(); + + // Create subscriber after message was published + let mut subscriber = nh + .subscribe::("/test_latching_topic", 1) + .await + .unwrap(); + + // Should receive the latched message + let msg = subscriber.next().await.unwrap().unwrap(); + assert_eq!(msg.data, "latched message"); + } + + #[test_log::test(tokio::test)] + async fn test_non_latching_publisher() { + let nh = NodeHandle::new("http://localhost:11311", "/test_not_latching") + .await + .unwrap(); + + let publisher = nh + .advertise::("/test_not_latching_topic", 1, false) + .await + .unwrap(); + + // Publish before subscriber exists + publisher + .publish(&std_msgs::String { + data: "test".to_owned(), + }) + .await + .unwrap(); + + let mut subscriber = nh + .subscribe::("/test_not_latching_topic", 1) + .await + .unwrap(); + + // Should timeout (message was not latched) + let res = timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await; + assert!(res.is_err()); + } + + #[test_log::test(tokio::test)] + async fn test_large_payload() { + let nh = NodeHandle::new("http://localhost:11311", "/test_large_payload") + .await + .unwrap(); + + let publisher = nh + .advertise::("/test_large_payload_topic", 1, false) + .await + .unwrap(); + + let mut subscriber = nh + .subscribe::("/test_large_payload_topic", 1) + .await + .unwrap(); + + // Give time for connection + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Test with 10KB payload (larger than typical MTU) + for _i in 0..5 { + let test_data = vec![42u8; 10_000]; + publisher + .publish(&std_msgs::UInt8MultiArray { + layout: std_msgs::MultiArrayLayout::default(), + data: test_data.clone(), + }) + .await + .unwrap(); + + let msg = subscriber.next().await.unwrap().unwrap(); + assert_eq!(msg.data, test_data); + } + } + + // ============================================================================ + // SECTION: Service Tests + // ============================================================================ + + #[test_log::test(tokio::test)] + async fn test_basic_service() { + let nh = NodeHandle::new("http://localhost:11311", "/test_basic_service") + .await + .unwrap(); + + // Advertise a simple service + let _server = nh + .advertise_service::("/test_basic_service_srv", |req| { + Ok(std_srvs::SetBoolResponse { + success: req.data, + message: if req.data { + "enabled".to_string() + } else { + "disabled".to_string() + }, + }) + }) + .await + .unwrap(); + + // Give time for service to register + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Create client and call service + let client = nh + .service_client::("/test_basic_service_srv") + .await + .unwrap(); + + let response = client + .call(&std_srvs::SetBoolRequest { data: true }) + .await + .unwrap(); + + assert!(response.success); + assert_eq!(response.message, "enabled"); + } + + #[test_log::test(tokio::test)] + async fn test_service_multiple_calls() { + let nh = NodeHandle::new("http://localhost:11311", "/test_service_multiple_calls") + .await + .unwrap(); + + let _server = nh + .advertise_service::("/test_multiple_calls_srv", |_req| { + Ok(std_srvs::TriggerResponse { + success: true, + message: "called".to_string(), + }) + }) + .await + .unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let client = nh + .service_client::("/test_multiple_calls_srv") + .await + .unwrap(); + + // Call the service multiple times + for _ in 0..10 { + let response = client.call(&std_srvs::TriggerRequest {}).await.unwrap(); + assert!(response.success); + } + } + + #[test_log::test(tokio::test)] + async fn test_service_error() { + let nh = NodeHandle::new("http://localhost:11311", "/test_service_error") + .await + .unwrap(); + + let _server = nh + .advertise_service::("/test_service_error_srv", |_req| { + Err(std::io::Error::new(std::io::ErrorKind::Other, "intentional error").into()) + }) + .await + .unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let client = nh + .service_client::("/test_service_error_srv") + .await + .unwrap(); + + let result = client.call(&std_srvs::TriggerRequest {}).await; + assert!(result.is_err()); + } + + #[test_log::test(tokio::test)] + async fn test_service_not_found() { + let nh = NodeHandle::new("http://localhost:11311", "/test_service_not_found") + .await + .unwrap(); + + let result = nh + .service_client::("/nonexistent_service") + .await; + + assert!(result.is_err()); + } + + // ============================================================================ + // SECTION: Drop/Cleanup Tests + // ============================================================================ + + #[test_log::test(tokio::test)] + async fn test_dropping_publisher_unregisters() { + let nh = NodeHandle::new("http://localhost:11311", "/test_drop_publisher") + .await + .unwrap(); + + let publisher = nh + .advertise::("/test_drop_publisher_topic", 1, false) + .await + .unwrap(); + + let master_client = + MasterClient::new("http://localhost:11311", "NAN", "/test_drop_publisher_mc") + .await + .unwrap(); + + let before = master_client.get_published_topics("").await.unwrap(); + assert!(before.contains(&( + "/test_drop_publisher_topic".to_string(), + "std_msgs/Header".to_string() + ))); + + // Drop the publisher + std::mem::drop(publisher); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Verify it's unregistered + let after = master_client.get_published_topics("").await.unwrap(); + assert!(!after.contains(&( + "/test_drop_publisher_topic".to_string(), + "std_msgs/Header".to_string() + ))); + } + + #[test_log::test(tokio::test)] + async fn test_dropping_subscriber_unregisters() { + let nh = NodeHandle::new("http://localhost:11311", "/test_drop_subscriber") + .await + .unwrap(); + + let _publisher = nh + .advertise::("/test_drop_subscriber_topic", 1, false) + .await + .unwrap(); + + let subscriber = nh + .subscribe::("/test_drop_subscriber_topic", 1) + .await + .unwrap(); + + let master_client = + MasterClient::new("http://localhost:11311", "NAN", "/test_drop_subscriber_mc") + .await + .unwrap(); + + let before = master_client.get_system_state().await.unwrap(); + assert!(before.is_subscribed("/test_drop_subscriber_topic", "/test_drop_subscriber")); + + // Drop the subscriber + std::mem::drop(subscriber); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let after = master_client.get_system_state().await.unwrap(); + assert!(!after.is_subscribed("/test_drop_subscriber_topic", "/test_drop_subscriber")); + } + + #[test_log::test(tokio::test)] + async fn test_dropping_service_server_unregisters() { + let nh = NodeHandle::new("http://localhost:11311", "/test_drop_service") + .await + .unwrap(); + + let server = nh + .advertise_service::("/test_drop_service_srv", |_req| { + Ok(std_srvs::TriggerResponse { + success: true, + message: "ok".to_string(), + }) + }) + .await + .unwrap(); + + // Verify service works + let client = nh + .service_client::("/test_drop_service_srv") + .await + .unwrap(); + let _response = client.call(&std_srvs::TriggerRequest {}).await.unwrap(); + + // Drop the server + std::mem::drop(server); + tokio::time::sleep(tokio::time::Duration::from_millis(250)).await; + + // Should fail to call + let result = client.call(&std_srvs::TriggerRequest {}).await; + assert!(result.is_err()); + + // Should fail to create new client + let client2 = nh + .service_client::("/test_drop_service_srv") + .await; + assert!(client2.is_err()); + } + + #[test_log::test(tokio::test)] + async fn test_multiple_publishers_partial_drop() { + let nh = NodeHandle::new("http://localhost:11311", "/test_multi_pub") + .await + .unwrap(); + + let publisher1 = nh + .advertise::("/test_multi_pub_topic", 1, false) + .await + .unwrap(); + + let publisher2 = nh + .advertise::("/test_multi_pub_topic", 1, false) + .await + .unwrap(); + + let master_client = + MasterClient::new("http://localhost:11311", "NAN", "/test_multi_pub_mc") + .await + .unwrap(); + + // Drop first publisher + std::mem::drop(publisher1); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Topic should still be advertised + let topics = master_client.get_published_topics("").await.unwrap(); + assert!(topics.contains(&( + "/test_multi_pub_topic".to_string(), + "std_msgs/String".to_string() + ))); + + // publisher2 should still work + publisher2 + .publish(&std_msgs::String { + data: "still works".to_string(), + }) + .await + .unwrap(); + } + + #[test_log::test(tokio::test)] + async fn test_multiple_subscribers_partial_drop() { + let nh = NodeHandle::new("http://localhost:11311", "/test_multi_sub") + .await + .unwrap(); + + let publisher = nh + .advertise::("/test_multi_sub_topic", 1, false) + .await + .unwrap(); + + let subscriber1 = nh + .subscribe::("/test_multi_sub_topic", 1) + .await + .unwrap(); + + let mut subscriber2 = nh + .subscribe::("/test_multi_sub_topic", 1) + .await + .unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + publisher + .publish(&std_msgs::String { + data: "test1".to_string(), + }) + .await + .unwrap(); + + // Drop first subscriber + std::mem::drop(subscriber1); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Publish another message + publisher + .publish(&std_msgs::String { + data: "test2".to_string(), + }) + .await + .unwrap(); + + // subscriber2 should still receive messages + let msg = timeout(tokio::time::Duration::from_millis(500), subscriber2.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + + assert!(msg.data == "test1" || msg.data == "test2"); + } + + #[test_log::test(tokio::test)] + async fn test_node_full_cleanup() { + let nh = NodeHandle::new("http://localhost:11311", "/test_node_cleanup") + .await + .unwrap(); + + let _publisher = nh + .advertise::("/test_cleanup_pub", 1, false) + .await + .unwrap(); + + let _subscriber = nh + .subscribe::("/test_cleanup_sub", 1) + .await + .unwrap(); + + let _service = nh + .advertise_service::("/test_cleanup_srv", |_| { + Ok(std_srvs::TriggerResponse::default()) + }) + .await + .unwrap(); + + let master_client = MasterClient::new("http://localhost:11311", "NAN", "/test_cleanup_mc") + .await + .unwrap(); + + let before = master_client.get_system_state().await.unwrap(); + assert!(before.is_publishing("/test_cleanup_pub", "/test_node_cleanup")); + assert!(before.is_subscribed("/test_cleanup_sub", "/test_node_cleanup")); + assert!(before.is_service_provider("/test_cleanup_srv", "/test_node_cleanup")); + + // Drop the entire node + std::mem::drop(nh); + + let after = master_client.get_system_state().await.unwrap(); + assert!(!after.is_publishing("/test_cleanup_pub", "/test_node_cleanup")); + assert!(!after.is_subscribed("/test_cleanup_sub", "/test_node_cleanup")); + assert!(!after.is_service_provider("/test_cleanup_srv", "/test_node_cleanup")); + } +} diff --git a/roslibrust_ros1/tests/ros1_subcriber_publisher_any.rs b/roslibrust_ros1/tests/ros1_subcriber_publisher_any.rs deleted file mode 100644 index 4f36677b..00000000 --- a/roslibrust_ros1/tests/ros1_subcriber_publisher_any.rs +++ /dev/null @@ -1,72 +0,0 @@ -//! Integration test for PublisherAny and SubscriberAny round-trip communication - -#[cfg(feature = "ros1_test")] -mod tests { - use roslibrust_ros1::NodeHandle; - use tokio::time::timeout; - - /// Test round-trip publish and subscribe using PublisherAny and SubscriberAny - /// This test verifies that: - /// 1. A PublisherAny can publish raw bytes to a topic - /// 2. A SubscriberAny can receive those raw bytes from the same topic - /// 3. The received bytes match the published bytes exactly - #[test_log::test(tokio::test)] - async fn test_publisher_any_subscriber_any_roundtrip() { - // Create a node handle - let nh = NodeHandle::new( - "http://localhost:11311", - "test_publisher_any_subscriber_any", - ) - .await - .expect("Failed to create node handle"); - - // Create a PublisherAny for std_msgs/String - let publisher = nh - .advertise_any( - "/test_roundtrip", - "std_msgs/String", - "string data\n", - 1, - true, - ) - .await - .expect("Failed to create publisher"); - - // Create a SubscriberAny for the same topic - let mut subscriber = nh - .subscribe_any("/test_roundtrip", 1) - .await - .expect("Failed to create subscriber"); - - // Prepare test data: serialized std_msgs/String with data="hello" - // Note: API doesn't consider the overall message length header as part of message - // Overall length header will be added automatically - // Format: [field_length (4 bytes), data (5 bytes)] - let test_message: Vec = vec![ - 0x05, 0x00, 0x00, 0x00, // field length = 5 - 0x68, 0x65, 0x6c, 0x6c, 0x6f, // "hello" - ]; - - // Publish the message - publisher - .publish(&test_message) - .await - .expect("Failed to publish message"); - - // Subscribe and receive the message with a timeout - let receive_result = - timeout(std::time::Duration::from_millis(500), subscriber.next()).await; - - // Verify we received the message - let received_message = receive_result - .expect("Timeout waiting for message") - .expect("Subscriber returned None") - .expect("Failed to receive message"); - - // Verify the received bytes match the published bytes - assert_eq!( - received_message, test_message, - "Received message does not match published message" - ); - } -} diff --git a/roslibrust_rosapi/src/lib.rs b/roslibrust_rosapi/src/lib.rs index d9533073..15158bb0 100644 --- a/roslibrust_rosapi/src/lib.rs +++ b/roslibrust_rosapi/src/lib.rs @@ -397,8 +397,7 @@ mod test { // Tiny sleep to throttle rate at which tests are run to try to make CI more consistent tokio::time::sleep(std::time::Duration::from_millis(50)).await; let opts = ClientHandleOptions::new("ws://localhost:9090") - // 200 ms failed CI - .timeout(std::time::Duration::from_millis(500)); + .timeout(std::time::Duration::from_millis(1000)); ClientHandle::new_with_options(opts).await.unwrap() } From 5334eb343ec39b23151495575bebcb6216109d0c Mon Sep 17 00:00:00 2001 From: carter Date: Sat, 16 May 2026 11:18:20 -0600 Subject: [PATCH 4/5] lint --- roslibrust_ros1/src/master_client.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/roslibrust_ros1/src/master_client.rs b/roslibrust_ros1/src/master_client.rs index 65177f04..a6dd185b 100644 --- a/roslibrust_ros1/src/master_client.rs +++ b/roslibrust_ros1/src/master_client.rs @@ -517,10 +517,7 @@ mod test { client.register_service(service, service_uri).await.unwrap(); // Confirm it exists - assert_eq!( - client.lookup_service(service).await.unwrap(), - service_uri - ); + assert_eq!(client.lookup_service(service).await.unwrap(), service_uri); // Unregister service assert!(client @@ -539,16 +536,10 @@ mod test { let service = "/my_service_for_testing_registration_sync"; let service_uri = "http://localhost:11312"; // Register - client - .register_service(service, service_uri) - .await - .unwrap(); + client.register_service(service, service_uri).await.unwrap(); // Confirm it exists - assert_eq!( - client.lookup_service(service).await.unwrap(), - service_uri - ); + assert_eq!(client.lookup_service(service).await.unwrap(), service_uri); // Unregister service assert!(sync_client From 7a3b05bb0c1c0815f570b19d09e5813c781da07b Mon Sep 17 00:00:00 2001 From: carter Date: Sat, 16 May 2026 11:32:11 -0600 Subject: [PATCH 5/5] Add protections around dropping a subscriber, publisher, service_server or node handle from a not tokio runtime environment --- roslibrust_ros1/src/node/actor.rs | 60 +++++++++----- .../tests/ros1_integration_tests.rs | 81 +++++++++++++++++++ 2 files changed, 120 insertions(+), 21 deletions(-) diff --git a/roslibrust_ros1/src/node/actor.rs b/roslibrust_ros1/src/node/actor.rs index 1cf65a89..ba0a2082 100644 --- a/roslibrust_ros1/src/node/actor.rs +++ b/roslibrust_ros1/src/node/actor.rs @@ -54,13 +54,19 @@ impl WeakNodeServerHandle { /// If the node is already gone, this is a no-op pub(crate) fn try_unregister_publisher(&self, topic_name: &str) { if let Some(node_handle) = self.upgrade() { - let topic_name = topic_name.to_string(); - tokio::spawn(async move { - let mut node = node_handle.node.lock().await; - if let Err(e) = node.unregister_publisher(&topic_name).await { - error!("Failed to unregister publisher {topic_name}: {e:?}"); - } - }); + // Check if a tokio runtime is available before attempting to spawn + // If the runtime was dropped before this object, we can't spawn the cleanup task + if tokio::runtime::Handle::try_current().is_ok() { + let topic_name = topic_name.to_string(); + tokio::spawn(async move { + let mut node = node_handle.node.lock().await; + if let Err(e) = node.unregister_publisher(&topic_name).await { + error!("Failed to unregister publisher {topic_name}: {e:?}"); + } + }); + } else { + debug!("No tokio runtime available, skipping publisher unadvertisement for {topic_name}"); + } } else { debug!("Node already dropped, skipping publisher unadvertisement for {topic_name}"); } @@ -71,13 +77,19 @@ impl WeakNodeServerHandle { /// If the node is already gone, this is a no-op pub(crate) fn try_unregister_service_server(&self, service_name: &str) { if let Some(node_handle) = self.upgrade() { - let service_name = service_name.to_string(); - tokio::spawn(async move { - let mut node = node_handle.node.lock().await; - if let Err(e) = node.unregister_service_server(&service_name).await { - error!("Failed to unregister service server {service_name}: {e:?}"); - } - }); + // Check if a tokio runtime is available before attempting to spawn + // If the runtime was dropped before this object, we can't spawn the cleanup task + if tokio::runtime::Handle::try_current().is_ok() { + let service_name = service_name.to_string(); + tokio::spawn(async move { + let mut node = node_handle.node.lock().await; + if let Err(e) = node.unregister_service_server(&service_name).await { + error!("Failed to unregister service server {service_name}: {e:?}"); + } + }); + } else { + debug!("No tokio runtime available, skipping service unadvertisement for {service_name}"); + } } else { debug!("Node already dropped, skipping service unadvertisement for {service_name}"); } @@ -88,13 +100,19 @@ impl WeakNodeServerHandle { /// If the node is already gone, this is a no-op pub(crate) fn try_unregister_subscriber(&self, topic_name: &str) { if let Some(node_handle) = self.upgrade() { - let topic_name = topic_name.to_string(); - tokio::spawn(async move { - let mut node = node_handle.node.lock().await; - if let Err(e) = node.unregister_subscriber(&topic_name).await { - error!("Failed to unregister subscriber {topic_name}: {e:?}"); - } - }); + // Check if a tokio runtime is available before attempting to spawn + // If the runtime was dropped before this object, we can't spawn the cleanup task + if tokio::runtime::Handle::try_current().is_ok() { + let topic_name = topic_name.to_string(); + tokio::spawn(async move { + let mut node = node_handle.node.lock().await; + if let Err(e) = node.unregister_subscriber(&topic_name).await { + error!("Failed to unregister subscriber {topic_name}: {e:?}"); + } + }); + } else { + debug!("No tokio runtime available, skipping subscriber unregistration for {topic_name}"); + } } else { debug!("Node already dropped, skipping subscriber unregistration for {topic_name}"); } diff --git a/roslibrust_ros1/tests/ros1_integration_tests.rs b/roslibrust_ros1/tests/ros1_integration_tests.rs index 1708a33b..a5d68b02 100644 --- a/roslibrust_ros1/tests/ros1_integration_tests.rs +++ b/roslibrust_ros1/tests/ros1_integration_tests.rs @@ -550,4 +550,85 @@ mod tests { assert!(!after.is_subscribed("/test_cleanup_sub", "/test_node_cleanup")); assert!(!after.is_service_provider("/test_cleanup_srv", "/test_node_cleanup")); } + + /// Test that dropping a subscriber, publisher, or service server after + /// the tokio runtime has been dropped does not panic. + /// This verifies the fix for the issue where `tokio::spawn` would panic + /// if called from Drop when no runtime exists. + #[test_log::test(test)] + fn test_drop_after_runtime_destroyed() { + // Create objects to be dropped later + let (publisher, subscriber, service_server) = { + // Create a new runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + + // Create objects within the runtime + rt.block_on(async { + let nh = NodeHandle::new("http://localhost:11311", "/test_drop_after_runtime") + .await + .unwrap(); + + let publisher = nh + .advertise::("/test_drop_after_runtime_pub", 1, false) + .await + .unwrap(); + + let subscriber = nh + .subscribe::("/test_drop_after_runtime_sub", 1) + .await + .unwrap(); + + let service_server = nh + .advertise_service::( + "/test_drop_after_runtime_srv", + |_req| { + Ok(std_srvs::TriggerResponse { + success: true, + message: "ok".to_string(), + }) + }, + ) + .await + .unwrap(); + + (publisher, subscriber, service_server) + }) + // Runtime is dropped here when it goes out of scope + }; + + // Now drop the objects after the runtime has been destroyed + // This should NOT panic - it should gracefully skip the unregister + std::mem::drop(publisher); + std::mem::drop(subscriber); + std::mem::drop(service_server); + + // If we get here without panicking, the test passes + } + + /// Test that dropping a NodeHandle after the tokio runtime has been + /// dropped does not panic. + /// This verifies that the Node's Drop implementation (which uses + /// synchronous cleanup) works correctly even after runtime shutdown. + #[test_log::test(test)] + fn test_drop_node_handle_after_runtime_destroyed() { + // Create node handle to be dropped later + let node_handle = { + // Create a new runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + + // Create node handle within the runtime + rt.block_on(async { + NodeHandle::new("http://localhost:11311", "/test_drop_node_after_runtime") + .await + .unwrap() + }) + // Runtime is dropped here when it goes out of scope + }; + + // Now drop the node handle after the runtime has been destroyed + // This should NOT panic - the Node's Drop uses sync cleanup + std::mem::drop(node_handle); + + // If we get here without panicking, the test passes + } }