diff --git a/Cargo.lock b/Cargo.lock index 9cb6758..78fe051 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,6 +69,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "aligned-vec" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b" +dependencies = [ + "equator", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -248,17 +257,6 @@ dependencies = [ "syn 2.0.114", ] -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi 0.1.19", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.5.0" @@ -535,18 +533,6 @@ dependencies = [ "inout", ] -[[package]] -name = "clap" -version = "3.2.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" -dependencies = [ - "bitflags 1.3.2", - "clap_lex 0.2.4", - "indexmap 1.9.3", - "textwrap", -] - [[package]] name = "clap" version = "4.5.56" @@ -565,7 +551,7 @@ checksum = "793207c7fa6300a0608d1080b858e5fdbe713cdc1c8db9fb17777d8a13e63df0" dependencies = [ "anstream", "anstyle", - "clap_lex 0.7.7", + "clap_lex", "strsim", ] @@ -581,15 +567,6 @@ dependencies = [ "syn 2.0.114", ] -[[package]] -name = "clap_lex" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" -dependencies = [ - "os_str_bytes", -] - [[package]] name = "clap_lex" version = "0.7.7" @@ -706,34 +683,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "criterion" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb" -dependencies = [ - "anes", - "atty", - "cast", - "ciborium", - "clap 3.2.25", - "criterion-plot", - "futures", - "itertools 0.10.5", - "lazy_static", - "num-traits", - "oorandom", - "plotters", - "rayon", - "regex", - "serde", - "serde_derive", - "serde_json", - "tinytemplate", - "tokio", - "walkdir", -] - [[package]] name = "criterion" version = "0.5.1" @@ -743,8 +692,9 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.5.56", + "clap", "criterion-plot", + "futures", "is-terminal", "itertools 0.10.5", "num-traits", @@ -757,6 +707,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -1108,6 +1059,26 @@ dependencies = [ "log", ] +[[package]] +name = "equator" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc" +dependencies = [ + "equator-macro", +] + +[[package]] +name = "equator-macro" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1536,15 +1507,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.5.2" @@ -1895,7 +1857,7 @@ version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ - "hermit-abi 0.5.2", + "hermit-abi", "libc", "windows-sys 0.61.2", ] @@ -2209,9 +2171,9 @@ checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" [[package]] name = "memmap2" -version = "0.5.10" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327" +checksum = "714098028fe011992e1c3962653c96b2d578c4b4bce9036e15ff220319b1e0e3" dependencies = [ "libc", ] @@ -2505,7 +2467,7 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" dependencies = [ - "hermit-abi 0.5.2", + "hermit-abi", "libc", ] @@ -2601,12 +2563,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" -[[package]] -name = "os_str_bytes" -version = "6.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" - [[package]] name = "overload" version = "0.1.1" @@ -2898,24 +2854,25 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "pprof" -version = "0.11.1" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "196ded5d4be535690899a4631cc9f18cdc41b7ebf24a79400f46f48e49a11059" +checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187" dependencies = [ + "aligned-vec", "backtrace", "cfg-if", - "criterion 0.4.0", + "criterion", "findshlibs", "inferno", "libc", "log", "nix 0.26.4", "once_cell", - "parking_lot", "smallvec", + "spin 0.10.0", "symbolic-demangle", "tempfile", - "thiserror 1.0.69", + "thiserror 2.0.18", ] [[package]] @@ -3368,7 +3325,7 @@ version = "0.1.0" source = "git+https://github.com/ZettaScaleLabs/ros-z.git#e4fdecc8dac71977093ce5ed666dab852789eb1a" dependencies = [ "byteorder", - "clap 4.5.56", + "clap", "csv", "dashmap", "event-listener", @@ -3481,7 +3438,7 @@ dependencies = [ name = "roslibrust_genmsg" version = "0.20.0" dependencies = [ - "clap 4.5.56", + "clap", "const_format", "env_logger 0.11.8", "itertools 0.12.1", @@ -3499,7 +3456,7 @@ version = "0.1.0" dependencies = [ "anyhow", "cdr", - "criterion 0.5.1", + "criterion", "env_logger 0.11.8", "log", "mcap", @@ -3601,9 +3558,9 @@ dependencies = [ [[package]] name = "roslibrust_serde_rosmsg" -version = "0.5.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be06f0fda624d8f8088d2231ef562e585edf1d57f7d5bb9db3f05ff219118b9c" +checksum = "166ab59c2890a665716249a3fb144c5bd20e901b5524f846100ad4ba448e0165" dependencies = [ "byteorder", "error-chain", @@ -3614,7 +3571,7 @@ dependencies = [ name = "roslibrust_test" version = "0.1.0" dependencies = [ - "criterion 0.4.0", + "criterion", "diffy", "env_logger 0.10.2", "hex", @@ -4314,6 +4271,9 @@ name = "spin" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591" +dependencies = [ + "lock_api", +] [[package]] name = "spki" @@ -4441,9 +4401,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "symbolic-common" -version = "10.2.1" +version = "12.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b55cdc318ede251d0957f07afe5fed912119b8c1bc5a7804151826db999e737" +checksum = "332615d90111d8eeaf86a84dc9bbe9f65d0d8c5cf11b4caccedc37754eb0dcfd" dependencies = [ "debugid", "memmap2", @@ -4453,9 +4413,9 @@ dependencies = [ [[package]] name = "symbolic-demangle" -version = "10.2.1" +version = "12.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79be897be8a483a81fff6a3a4e195b4ac838ef73ca42d348b3f722da9902e489" +checksum = "912017718eb4d21930546245af9a3475c9dccf15675a5c215664e76621afc471" dependencies = [ "cpp_demangle", "rustc-demangle", @@ -4572,12 +4532,6 @@ dependencies = [ "syn 2.0.114", ] -[[package]] -name = "textwrap" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c13547615a44dc9c452a8a534638acdf07120d4b6847c8178705da06306a3057" - [[package]] name = "thiserror" version = "1.0.69" diff --git a/Cargo.toml b/Cargo.toml index 968c01f..82b68e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,5 +27,5 @@ tokio = {version = "1", features = ["full", "test-util"] } serde = { version = "1.0", features = ["derive"] } # Someday we may move this crate into this workspace # For now this is how we keep from repeating the verison everywhere -roslibrust_serde_rosmsg = "0.5.1" +roslibrust_serde_rosmsg = "0.7.0" test-log = "0.2" diff --git a/roslibrust_ros1/src/publisher.rs b/roslibrust_ros1/src/publisher.rs index 017ce0b..948d573 100644 --- a/roslibrust_ros1/src/publisher.rs +++ b/roslibrust_ros1/src/publisher.rs @@ -3,12 +3,14 @@ use crate::{ tcpros::{self, ConnectionHeader}, }; use abort_on_drop::ChildTask; -use bytes::Bytes; +use bytes::{BufMut, Bytes}; use log::*; use roslibrust_common::RosMessageType; use std::{ + io::Write, marker::PhantomData, net::{Ipv4Addr, SocketAddr}, + sync::atomic::{AtomicUsize, Ordering}, }; use tokio::{ io::AsyncWriteExt, @@ -27,6 +29,8 @@ pub struct Publisher { _shutdown_channel: tokio::sync::mpsc::Sender<()>, // Phantom data to ensure that the type is known at compile time phantom: PhantomData, + // Used to track buffer capacity size to minimize allocations + capacity_hint: AtomicUsize, } impl Publisher { @@ -40,20 +44,30 @@ impl Publisher { sender, _shutdown_channel: shutdown_channel, phantom: PhantomData, + // Default capacity size, after this we use the size of the previous message +32 + capacity_hint: 1024.into(), } } /// Queues a message to be sent on the related topic. // TODO Major this no longer needs to be (or should be) async pub async fn publish(&self, data: &T) -> Result<(), PublisherError> { - let data = roslibrust_serde_rosmsg::to_vec(&data)?; - // TODO this is a pretty dumb... - // because of the internal channel used for re-direction this future doesn't - // actually complete when the data is sent, but merely when it is queued to be sent - // This function could probably be non-async - // Or we should do some significant re-work to have it only yield when the data is sent. + let size_hint = self.capacity_hint.load(Ordering::Relaxed); + let buffer = bytes::BytesMut::with_capacity(size_hint + 4); + let mut writer = buffer.writer(); + // Write empty u32 for size + writer.write(&[0, 0, 0, 0]).unwrap(); + roslibrust_serde_rosmsg::to_writer_skip_length(&mut writer, data)?; + let mut buffer = writer.into_inner(); + // Patch size back to front of buffer + let size = buffer.len() as u32 - 4; + if size > size_hint as u32 { + self.capacity_hint.store(size as usize, Ordering::Relaxed); + } + buffer[0..4].copy_from_slice(&size.to_le_bytes()); + let bytes = buffer.freeze(); self.sender - .send(data.into()) + .send(bytes) .map_err(|_| PublisherError::StreamClosed)?; debug!("Publishing data on topic {}", self.topic_name); Ok(()) diff --git a/roslibrust_ros1/src/subscriber.rs b/roslibrust_ros1/src/subscriber.rs index 3134024..3c97eba 100644 --- a/roslibrust_ros1/src/subscriber.rs +++ b/roslibrust_ros1/src/subscriber.rs @@ -301,6 +301,7 @@ async fn publisher_reader_task( ) { let mut retry_period = INITIAL_RETRY_PERIOD; let mut tcp_endpoint: Option = None; + let mut receive_capacity_hint = 1024; 'connection_loop: loop { // Check for cancellation before attempting connection @@ -384,12 +385,15 @@ async fn publisher_reader_task( log::debug!("Publisher reader task for {publisher_uri} cancelled during read"); break 'connection_loop; } - result = tcpros::receive_body(&mut stream) => { + result = tcpros::receive_body_with_capacity_hint(&mut stream, receive_capacity_hint) => { match result { Ok(body) => { trace!( "Subscription to {topic_name} received message from {publisher_uri}" ); + if body.len() > receive_capacity_hint { + receive_capacity_hint = body.len(); + } if sender.send(body).is_err() { log::error!( "Unable to send message data due to dropped channel, closing connection to {publisher_uri}" diff --git a/roslibrust_ros1/src/tcpros.rs b/roslibrust_ros1/src/tcpros.rs index d4d0547..2d67692 100644 --- a/roslibrust_ros1/src/tcpros.rs +++ b/roslibrust_ros1/src/tcpros.rs @@ -270,18 +270,44 @@ pub async fn receive_header(stream: &mut TcpStream) -> Result Result { + receive_body_with_capacity_hint(stream, 1024).await +} + +/// Reads the body of a message from the given stream, using a previously observed +/// message size as the initial allocation. +/// +/// This is useful for large fixed-shape streams such as images: after the first +/// message, the receive buffer can usually be allocated at the final size before +/// the length prefix has been read. +pub async fn receive_body_with_capacity_hint( + stream: &mut TcpStream, + capacity_hint: usize, +) -> Result { use bytes::{BufMut, BytesMut}; use tokio::io::AsyncReadExt; - let body_len = stream.read_u32_le().await? as usize; + let mut buf = BytesMut::with_capacity(capacity_hint.max(4)); + + while buf.len() < 4 { + let remaining = 4 - buf.len(); + let n = stream.read_buf(&mut (&mut buf).limit(remaining)).await?; + if n == 0 { + return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof)); + } + } + + let body_len = + u32::from_le_bytes(buf[..4].try_into().expect("length prefix is four bytes")) as usize; let total_len = 4 + body_len; - let mut buf = BytesMut::with_capacity(total_len); - buf.put_u32_le(body_len as u32); // len == 4, capacity == 4 + body_len + if buf.capacity() < total_len { + buf.reserve(total_len - buf.capacity()); + } // Read until we have read the full body while buf.len() < total_len { - let n = stream.read_buf(&mut buf).await?; + let remaining = total_len - buf.len(); + let n = stream.read_buf(&mut (&mut buf).limit(remaining)).await?; if n == 0 { return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof)); } diff --git a/roslibrust_test/Cargo.toml b/roslibrust_test/Cargo.toml index 1066c46..a6bc587 100644 --- a/roslibrust_test/Cargo.toml +++ b/roslibrust_test/Cargo.toml @@ -13,8 +13,8 @@ log = { workspace = true } [dev-dependencies] diffy = "0.3.0" -criterion = { version = "0.4", features = ["html_reports", "async_tokio"] } -pprof = { version = "0.11", features = ["flamegraph", "criterion"] } +criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } +pprof = { version = "0.15", features = ["flamegraph", "criterion"] } test-log = { workspace = true } hex = "0.4" diff --git a/roslibrust_test/cpp_image_bench/CMakeLists.txt b/roslibrust_test/cpp_image_bench/CMakeLists.txt new file mode 100644 index 0000000..e4c3625 --- /dev/null +++ b/roslibrust_test/cpp_image_bench/CMakeLists.txt @@ -0,0 +1,21 @@ +cmake_minimum_required(VERSION 3.0.2) +project(roslibrust_cpp_image_bench) + +if(NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE Release CACHE STRING "Build type" FORCE) +endif() + +find_package(catkin REQUIRED COMPONENTS + roscpp + sensor_msgs + std_msgs +) + +catkin_package() + +include_directories(${catkin_INCLUDE_DIRS}) + +add_executable(cpp_image_bench src/image_bench.cpp) +target_compile_features(cpp_image_bench PRIVATE cxx_std_17) +target_compile_options(cpp_image_bench PRIVATE -O3 -DNDEBUG) +target_link_libraries(cpp_image_bench ${catkin_LIBRARIES}) diff --git a/roslibrust_test/cpp_image_bench/README.md b/roslibrust_test/cpp_image_bench/README.md new file mode 100644 index 0000000..3b8d2e7 --- /dev/null +++ b/roslibrust_test/cpp_image_bench/README.md @@ -0,0 +1,70 @@ +# ROS1 C++ Image Benchmark + +This catkin package provides a roscpp comparison point for `roslibrust_test/benches/image_bench.rs`. + +The default message matches the Rust benchmark: + +- topic: `/image_bench` +- queue size: `1` +- latching: `false` +- image: `1920x1080`, `rgb8`, `3` bytes per pixel +- payload: `6,220,800` bytes + +## Build + +From any catkin workspace: + +```bash +mkdir -p ~/cpp_image_bench_ws/src +ln -s /roslibrust/roslibrust_test/cpp_image_bench ~/cpp_image_bench_ws/src/ +cd ~/cpp_image_bench_ws +source /opt/ros/noetic/setup.bash +catkin_make -DCMAKE_BUILD_TYPE=Release +source devel/setup.bash +``` + +## Same-Process Benchmark + +This is the closest shape to `image_bench.rs`, with one process that advertises and subscribes to the same topic: + +```bash +roscore +``` + +In another shell: + +```bash +source /opt/ros/noetic/setup.bash +source ~/cpp_image_bench_ws/devel/setup.bash +rosrun roslibrust_cpp_image_bench cpp_image_bench --mode same_process --iterations 1000 --warmup 25 +``` + +Important: roscpp may use intra-process delivery in this mode, so this can be much faster than real TCPROS transport. + +## TCPROS Benchmark + +Use this mode for a better estimate of roscpp image transport across a real ROS1 publisher/subscriber connection. The reported timing is image publish to tiny ack receipt, so the extra ack cost is included but normally small next to the image transfer. + +Terminal 1: + +```bash +roscore +``` + +Terminal 2: + +```bash +source /opt/ros/noetic/setup.bash +source ~/cpp_image_bench_ws/devel/setup.bash +rosrun roslibrust_cpp_image_bench cpp_image_bench --mode responder +``` + +Terminal 3: + +```bash +source /opt/ros/noetic/setup.bash +source ~/cpp_image_bench_ws/devel/setup.bash +rosrun roslibrust_cpp_image_bench cpp_image_bench --mode driver --iterations 1000 --warmup 25 +``` + +The benchmark prints a final `json: {...}` line with nanosecond timings and payload throughput. diff --git a/roslibrust_test/cpp_image_bench/package.xml b/roslibrust_test/cpp_image_bench/package.xml new file mode 100644 index 0000000..c4a0151 --- /dev/null +++ b/roslibrust_test/cpp_image_bench/package.xml @@ -0,0 +1,19 @@ + + + roslibrust_cpp_image_bench + 0.1.0 + ROS1 roscpp image roundtrip benchmark comparable to roslibrust_test image_bench. + + Carter Schultz + MIT + + catkin + + roscpp + sensor_msgs + std_msgs + + roscpp + sensor_msgs + std_msgs + diff --git a/roslibrust_test/cpp_image_bench/src/image_bench.cpp b/roslibrust_test/cpp_image_bench/src/image_bench.cpp new file mode 100644 index 0000000..33be8b5 --- /dev/null +++ b/roslibrust_test/cpp_image_bench/src/image_bench.cpp @@ -0,0 +1,449 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace { + +using Clock = std::chrono::steady_clock; +using Ns = std::chrono::nanoseconds; + +struct Options { + std::string mode = "same_process"; + std::string topic = "/image_bench"; + std::string ack_topic = "/image_bench_ack"; + int iterations = 1000; + int warmup = 25; + int queue_size = 1; + int width = 1920; + int height = 1080; + int channels = 3; + std::string encoding = "rgb8"; + bool tcp_no_delay = false; + double connect_timeout_s = 10.0; + double message_timeout_s = 5.0; +}; + +struct Stats { + double mean_ns = 0.0; + double median_ns = 0.0; + double p95_ns = 0.0; + double min_ns = 0.0; + double max_ns = 0.0; + double stddev_ns = 0.0; +}; + +void usage(const char* argv0) { + std::cerr + << "Usage: " << argv0 << " [--mode same_process|driver|responder] [options]\n" + << "\n" + << "Options:\n" + << " --iterations N Measured iterations, default 1000\n" + << " --warmup N Warmup iterations ignored in stats, default 25\n" + << " --width N Image width, default 1920\n" + << " --height N Image height, default 1080\n" + << " --channels N Bytes per pixel, default 3\n" + << " --encoding NAME Image encoding, default rgb8\n" + << " --queue-size N ROS pub/sub queue size, default 1\n" + << " --topic NAME Image topic, default /image_bench\n" + << " --ack-topic NAME Ack topic for driver/responder mode, default /image_bench_ack\n" + << " --tcp-no-delay Use ros::TransportHints().tcpNoDelay()\n" + << " --connect-timeout S Connection wait timeout, default 10\n" + << " --message-timeout S Per-message wait timeout, default 5\n"; +} + +int parse_int(const std::string& value, const std::string& name) { + char* end = nullptr; + const long parsed = std::strtol(value.c_str(), &end, 10); + if (end == value.c_str() || *end != '\0' || parsed > std::numeric_limits::max() || + parsed < std::numeric_limits::min()) { + throw std::runtime_error("Invalid integer for " + name + ": " + value); + } + return static_cast(parsed); +} + +double parse_double(const std::string& value, const std::string& name) { + char* end = nullptr; + const double parsed = std::strtod(value.c_str(), &end); + if (end == value.c_str() || *end != '\0' || !std::isfinite(parsed)) { + throw std::runtime_error("Invalid number for " + name + ": " + value); + } + return parsed; +} + +Options parse_options(int argc, char** argv) { + Options options; + for (int i = 1; i < argc; ++i) { + const std::string arg = argv[i]; + auto need_value = [&](const std::string& name) -> std::string { + if (i + 1 >= argc) { + throw std::runtime_error("Missing value for " + name); + } + return argv[++i]; + }; + + if (arg == "--help" || arg == "-h") { + usage(argv[0]); + std::exit(0); + } else if (arg == "--mode") { + options.mode = need_value(arg); + } else if (arg == "--iterations") { + options.iterations = parse_int(need_value(arg), arg); + } else if (arg == "--warmup") { + options.warmup = parse_int(need_value(arg), arg); + } else if (arg == "--width") { + options.width = parse_int(need_value(arg), arg); + } else if (arg == "--height") { + options.height = parse_int(need_value(arg), arg); + } else if (arg == "--channels") { + options.channels = parse_int(need_value(arg), arg); + } else if (arg == "--encoding") { + options.encoding = need_value(arg); + } else if (arg == "--queue-size") { + options.queue_size = parse_int(need_value(arg), arg); + } else if (arg == "--topic") { + options.topic = need_value(arg); + } else if (arg == "--ack-topic") { + options.ack_topic = need_value(arg); + } else if (arg == "--tcp-no-delay") { + options.tcp_no_delay = true; + } else if (arg == "--connect-timeout") { + options.connect_timeout_s = parse_double(need_value(arg), arg); + } else if (arg == "--message-timeout") { + options.message_timeout_s = parse_double(need_value(arg), arg); + } else { + throw std::runtime_error("Unknown option: " + arg); + } + } + + if (options.mode != "same_process" && options.mode != "driver" && options.mode != "responder") { + throw std::runtime_error("Unsupported mode: " + options.mode); + } + if (options.iterations <= 0 || options.warmup < 0 || options.queue_size <= 0 || + options.width <= 0 || options.height <= 0 || options.channels <= 0) { + throw std::runtime_error("iterations, queue-size, width, height, and channels must be positive"); + } + return options; +} + +sensor_msgs::Image make_image(const Options& options) { + sensor_msgs::Image image; + image.header.seq = 0; + image.height = static_cast(options.height); + image.width = static_cast(options.width); + image.encoding = options.encoding; + image.is_bigendian = 0; + image.step = static_cast(options.width * options.channels); + image.data.assign(static_cast(image.step) * image.height, 0); + return image; +} + +Stats compute_stats(std::vector values) { + if (values.empty()) { + throw std::runtime_error("No samples collected"); + } + + Stats stats; + stats.min_ns = *std::min_element(values.begin(), values.end()); + stats.max_ns = *std::max_element(values.begin(), values.end()); + stats.mean_ns = std::accumulate(values.begin(), values.end(), 0.0) / values.size(); + + double variance = 0.0; + for (const double value : values) { + const double delta = value - stats.mean_ns; + variance += delta * delta; + } + stats.stddev_ns = std::sqrt(variance / values.size()); + + std::sort(values.begin(), values.end()); + const auto percentile = [&](double p) { + const double idx = p * static_cast(values.size() - 1); + const auto lo = static_cast(std::floor(idx)); + const auto hi = static_cast(std::ceil(idx)); + const double frac = idx - static_cast(lo); + return values[lo] * (1.0 - frac) + values[hi] * frac; + }; + + stats.median_ns = percentile(0.50); + stats.p95_ns = percentile(0.95); + return stats; +} + +ros::TransportHints transport_hints(const Options& options) { + ros::TransportHints hints; + if (options.tcp_no_delay) { + hints.tcpNoDelay(); + } + return hints; +} + +bool wait_for_connections(const std::function& ready, double timeout_s) { + const ros::Time deadline = ros::Time::now() + ros::Duration(timeout_s); + ros::Rate rate(100.0); + while (ros::ok() && ros::Time::now() < deadline) { + if (ready()) { + return true; + } + rate.sleep(); + } + return ready(); +} + +void settle_connections() { + ros::Duration(0.1).sleep(); +} + +void print_results(const Options& options, const Stats& stats, size_t payload_bytes) { + const double mean_s = stats.mean_ns / 1.0e9; + const double mib_per_s = (static_cast(payload_bytes) / (1024.0 * 1024.0)) / mean_s; + const double gb_per_s = (static_cast(payload_bytes) / 1.0e9) / mean_s; + + std::cout << std::fixed << std::setprecision(3); + std::cout << "mode: " << options.mode << "\n"; + std::cout << "samples: " << options.iterations << " measured, " << options.warmup << " warmup\n"; + std::cout << "image: " << options.width << "x" << options.height << " " << options.encoding + << ", payload_bytes: " << payload_bytes << "\n"; + std::cout << "mean: " << stats.mean_ns / 1000000.0 << " ms\n"; + std::cout << "median: " << stats.median_ns / 1000000.0 << " ms\n"; + std::cout << "p95: " << stats.p95_ns / 1000000.0 << " ms\n"; + std::cout << "min: " << stats.min_ns / 1000000.0 << " ms\n"; + std::cout << "max: " << stats.max_ns / 1000000.0 << " ms\n"; + std::cout << "stddev: " << stats.stddev_ns / 1000000.0 << " ms\n"; + std::cout << "throughput_payload: " << mib_per_s << " MiB/s (" << gb_per_s << " GB/s)\n"; + + std::cout << std::setprecision(6); + std::cout << "json: {" + << "\"mode\":\"" << options.mode << "\"," + << "\"iterations\":" << options.iterations << "," + << "\"warmup\":" << options.warmup << "," + << "\"width\":" << options.width << "," + << "\"height\":" << options.height << "," + << "\"channels\":" << options.channels << "," + << "\"encoding\":\"" << options.encoding << "\"," + << "\"queue_size\":" << options.queue_size << "," + << "\"tcp_no_delay\":" << (options.tcp_no_delay ? "true" : "false") << "," + << "\"payload_bytes\":" << payload_bytes << "," + << "\"mean_ns\":" << stats.mean_ns << "," + << "\"median_ns\":" << stats.median_ns << "," + << "\"p95_ns\":" << stats.p95_ns << "," + << "\"min_ns\":" << stats.min_ns << "," + << "\"max_ns\":" << stats.max_ns << "," + << "\"stddev_ns\":" << stats.stddev_ns << "," + << "\"throughput_mib_s\":" << mib_per_s << "," + << "\"throughput_gb_s\":" << gb_per_s << "}\n"; +} + +class SequenceWaiter { + public: + void mark(uint32_t seq) { + std::lock_guard lock(mutex_); + last_seq_ = seq; + cv_.notify_all(); + } + + bool wait(uint32_t seq, double timeout_s) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, std::chrono::duration(timeout_s), + [&] { return last_seq_ >= seq; }); + } + + bool wait_spinning(uint32_t seq, double timeout_s) { + const auto deadline = Clock::now() + std::chrono::duration(timeout_s); + while (ros::ok() && Clock::now() < deadline) { + { + std::lock_guard lock(mutex_); + if (last_seq_ >= seq) { + return true; + } + } + ros::spinOnce(); + std::this_thread::yield(); + } + + std::lock_guard lock(mutex_); + return last_seq_ >= seq; + } + + private: + std::mutex mutex_; + std::condition_variable cv_; + uint32_t last_seq_ = 0; +}; + +int run_same_process(const Options& options) { + ros::NodeHandle nh; + SequenceWaiter waiter; + uint64_t checksum = 0; + + ros::Publisher publisher = + nh.advertise(options.topic, options.queue_size, false); + boost::function image_cb = + [&](const sensor_msgs::ImageConstPtr& msg) { + checksum += msg->data.empty() ? 0 : msg->data.back(); + waiter.mark(msg->header.seq); + }; + ros::Subscriber subscriber = nh.subscribe( + options.topic, options.queue_size, image_cb, ros::VoidConstPtr(), transport_hints(options)); + + if (!wait_for_connections([&] { return publisher.getNumSubscribers() > 0; }, + options.connect_timeout_s)) { + throw std::runtime_error("Timed out waiting for same-process subscriber connection"); + } + settle_connections(); + + sensor_msgs::Image image = make_image(options); + image.header.seq = 1; + const auto prime_deadline = Clock::now() + std::chrono::duration(options.message_timeout_s); + while (ros::ok() && Clock::now() < prime_deadline) { + publisher.publish(image); + if (waiter.wait_spinning(image.header.seq, 0.01)) { + break; + } + ros::Duration(0.01).sleep(); + } + if (!waiter.wait_spinning(image.header.seq, 0.0)) { + throw std::runtime_error("Timed out priming same-process image delivery"); + } + + std::vector timings; + timings.reserve(static_cast(options.iterations)); + + const int total = options.warmup + options.iterations; + for (int i = 0; ros::ok() && i < total; ++i) { + image.header.seq = static_cast(i + 2); + const auto start = Clock::now(); + publisher.publish(image); + if (!waiter.wait_spinning(image.header.seq, options.message_timeout_s)) { + throw std::runtime_error("Timed out waiting for image callback"); + } + const auto end = Clock::now(); + if (i >= options.warmup) { + timings.push_back(static_cast(std::chrono::duration_cast(end - start).count())); + } + } + + const Stats stats = compute_stats(std::move(timings)); + print_results(options, stats, image.data.size()); + std::cerr << "checksum: " << checksum << "\n"; + return 0; +} + +int run_driver(const Options& options) { + ros::NodeHandle nh; + SequenceWaiter ack_waiter; + + ros::Publisher image_pub = + nh.advertise(options.topic, options.queue_size, false); + boost::function ack_cb = + [&](const std_msgs::UInt32ConstPtr& msg) { ack_waiter.mark(msg->data); }; + ros::Subscriber ack_sub = nh.subscribe( + options.ack_topic, options.queue_size, ack_cb, ros::VoidConstPtr(), transport_hints(options)); + + if (!wait_for_connections( + [&] { return image_pub.getNumSubscribers() > 0 && ack_sub.getNumPublishers() > 0; }, + options.connect_timeout_s)) { + throw std::runtime_error("Timed out waiting for responder connections"); + } + settle_connections(); + + sensor_msgs::Image image = make_image(options); + image.header.seq = 1; + const auto prime_deadline = Clock::now() + std::chrono::duration(options.message_timeout_s); + while (ros::ok() && Clock::now() < prime_deadline) { + image_pub.publish(image); + if (ack_waiter.wait_spinning(image.header.seq, 0.01)) { + break; + } + ros::Duration(0.01).sleep(); + } + if (!ack_waiter.wait_spinning(image.header.seq, 0.0)) { + throw std::runtime_error("Timed out priming responder image delivery"); + } + + std::vector timings; + timings.reserve(static_cast(options.iterations)); + + const int total = options.warmup + options.iterations; + for (int i = 0; ros::ok() && i < total; ++i) { + image.header.seq = static_cast(i + 2); + const auto start = Clock::now(); + image_pub.publish(image); + if (!ack_waiter.wait_spinning(image.header.seq, options.message_timeout_s)) { + throw std::runtime_error("Timed out waiting for ack"); + } + const auto end = Clock::now(); + if (i >= options.warmup) { + timings.push_back(static_cast(std::chrono::duration_cast(end - start).count())); + } + } + + const Stats stats = compute_stats(std::move(timings)); + print_results(options, stats, image.data.size()); + return 0; +} + +int run_responder(const Options& options) { + ros::NodeHandle nh; + uint64_t received = 0; + uint64_t checksum = 0; + + ros::Publisher ack_pub = + nh.advertise(options.ack_topic, options.queue_size, false); + boost::function image_cb = + [&](const sensor_msgs::ImageConstPtr& msg) { + checksum += msg->data.empty() ? 0 : msg->data.back(); + ++received; + std_msgs::UInt32 ack; + ack.data = msg->header.seq; + ack_pub.publish(ack); + }; + ros::Subscriber image_sub = nh.subscribe( + options.topic, options.queue_size, image_cb, ros::VoidConstPtr(), transport_hints(options)); + + ROS_INFO_STREAM("Responder ready on " << options.topic << ", publishing acks on " + << options.ack_topic); + ros::spin(); + std::cerr << "received: " << received << ", checksum: " << checksum << "\n"; + return 0; +} + +} // namespace + +int main(int argc, char** argv) { + try { + const Options options = parse_options(argc, argv); + std::string node_name = "cpp_image_bench_" + options.mode; + ros::init(argc, argv, node_name, ros::init_options::AnonymousName); + + if (options.mode == "same_process") { + return run_same_process(options); + } + if (options.mode == "driver") { + return run_driver(options); + } + return run_responder(options); + } catch (const std::exception& e) { + std::cerr << "error: " << e.what() << "\n"; + usage(argv[0]); + return 1; + } +}