Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Added the roslibrust_mcap crate which provides utilities for reading and writing MCAP files compatible ROS2 bag tools with ROS message support.
- ROS1 and Zenoh backends now provide an API for checking number of connected subscribers directly at the publisher. This is used to skip serialization if no subscribers are connected and can provide a substantial CPU savings if idle topics.

### Fixed

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn relay<T: TopicProvider>(ros: T) -> roslibrust::Result<()> {

#[tokio::main]
async fn main() -> roslibrust::Result<()> {
// Experimental support in roslibrust_ros2, not yet released on crates.io
// Relay messages over a native ROS2 connection using Zenoh
// #[cfg(feature = "ros2")]
// {
Expand Down
39 changes: 39 additions & 0 deletions roslibrust_ros1/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,24 @@ impl<T: RosMessageType> Publisher<T> {
}
}

/// Checks if there are any connected subscribers.
/// This can be used to skip expensive message construction when no one is listening.
pub fn has_connected_clients(&self) -> bool {
self.sender.receiver_count() > 0
}

/// Queues a message to be sent on the related topic.
// TODO Major this no longer needs to be (or should be) async
pub async fn publish(&self, data: &T) -> Result<(), PublisherError> {
// Skip serialization if there are no connected clients
if !self.has_connected_clients() {
debug!(
"Skipping publish on topic {} - no connected clients",
self.topic_name
);
return Ok(());
}

let size_hint = self.capacity_hint.load(Ordering::Relaxed);
let buffer = bytes::BytesMut::with_capacity(size_hint + 4);
let mut writer = buffer.writer();
Expand Down Expand Up @@ -101,6 +116,12 @@ impl PublisherAny {
}
}

/// Checks if there are any connected subscribers.
/// This can be used to skip expensive message construction when no one is listening.
pub fn has_connected_clients(&self) -> bool {
self.sender.receiver_count() > 0
}

/// Queues a message to be sent on the related topic.
///
/// This expects the data to be the raw bytes of the message body as they would appear going over the wire.
Expand All @@ -114,6 +135,15 @@ impl PublisherAny {
/// - Pre-serialized ROS message data
// TODO this no longer needs to be (or should be) async
pub async fn publish(&self, data: impl AsRef<[u8]>) -> Result<(), PublisherError> {
// Skip if there are no connected clients
if !self.has_connected_clients() {
debug!(
"Skipping publish on topic {} - no connected clients",
self.topic_name
);
return Ok(());
}

// TODO this is a pretty dumb...
// because of the internal channel used for re-direction this future doesn't
// actually complete when the data is sent, but merely when it is queued to be sent
Expand All @@ -133,6 +163,15 @@ impl PublisherAny {
/// as it avoids any copying.
// TODO this no longer needs to be (or should be) async
pub async fn publish_bytes(&self, data: Bytes) -> Result<(), PublisherError> {
// Skip if there are no connected clients
if !self.has_connected_clients() {
debug!(
"Skipping publish on topic {} - no connected clients",
self.topic_name
);
return Ok(());
}

self.sender
.send(data)
.map_err(|_| PublisherError::StreamClosed)?;
Expand Down
22 changes: 22 additions & 0 deletions roslibrust_zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,30 @@ pub struct ZenohPublisher<T> {
_marker: std::marker::PhantomData<T>,
}

impl<T: RosMessageType> ZenohPublisher<T> {
/// Checks if there are any connected subscribers.
/// This can be used to skip expensive message construction when no one is listening.
pub async fn has_connected_clients(&self) -> bool {
match self.publisher.matching_status().await {
Ok(status) => status.matching(),
Err(e) => {
// If we can't determine the status, assume there might be subscribers
// to avoid dropping messages
warn!("Failed to get matching status: {e:?}, assuming subscribers exist");
true
}
}
}
}

impl<T: RosMessageType> Publish<T> for ZenohPublisher<T> {
async fn publish(&self, data: &T) -> Result<()> {
// Skip serialization if there are no connected clients
if !self.has_connected_clients().await {
debug!("Skipping publish - no connected clients");
return Ok(());
}

let size_hint = self.capacity_hint.load(Ordering::Relaxed);
let mut bytes = Vec::with_capacity(size_hint);
roslibrust_serde_rosmsg::to_writer_skip_length(&mut bytes, data).map_err(|e| {
Expand Down
Loading