From 6113a759ad6a5318701b189f3bfd64570c899118 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Wed, 29 Apr 2026 13:09:55 +0200 Subject: [PATCH 1/7] Rust::com change the return type of async Receive * Change return type from result to tuple --- .../basic-consumer-producer.rs | 63 ++++++++----------- .../com-api-concept/com_api_concept.rs | 8 ++- .../com-api/com-api-runtime-lola/consumer.rs | 37 +++++++---- .../com-api/com-api-runtime-mock/runtime.rs | 16 ++--- .../com/impl/rust/com-api/com-api/com_api.rs | 3 +- .../consumer_async_apis/consumer_app.rs | 42 ++++++------- 6 files changed, 80 insertions(+), 89 deletions(-) diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs index d867bab98..001993776 100644 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ b/score/mw/com/example/com-api-example/basic-consumer-producer.rs @@ -328,33 +328,25 @@ mod test { for attempt in 0..MAX_ATTEMPTS { println!("[RECEIVER] Attempt {}", attempt); - // Match and immediately reassign in all branches - sample_buf = match subscribed.receive(sample_buf, 1, 3).await { - Ok(returned_buf) => { - let count = returned_buf.sample_count(); - - if count > 0 { - total_received += count; - println!( - "[RECEIVER] Received {} samples (total: {})", - count, total_received - ); - - // Create a mutable version to pop from - let mut buf = returned_buf; - while let Some(sample) = buf.pop_front() { - println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); - } - buf - } else { - returned_buf - } - } - Err(e) => { + // Destructure tuple - container is always returned even on error + let (returned_buf, result) = subscribed.receive(sample_buf, 1, 3).await; + sample_buf = { + let count = returned_buf.sample_count(); + if let Err(e) = result { println!("[RECEIVER] Error on attempt {}: {:?}", attempt, e); - // Create a fresh buffer if there's an error - SampleContainer::new(5) + } else if count > 0 { + total_received += count; + println!( + "[RECEIVER] Received {} samples (total: {})", + count, total_received + ); } + // Drain printed samples + let mut buf = returned_buf; + while let Some(sample) = buf.pop_front() { + println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); + } + buf }; } @@ -399,24 +391,21 @@ mod test { println!("[RECEIVER] Async data processor started"); let mut buffer = SampleContainer::new(5); for _ in 0..5 { - buffer = match subscribed.receive(buffer, 2, 3).await { - Ok(returned_buf) => { + let (returned_buf, result) = subscribed.receive(buffer, 2, 3).await; + buffer = { + if let Err(e) = result { + println!("[RECEIVER] Error receiving data: {:?}", e); + } else { let count = returned_buf.sample_count(); if count > 0 { println!("[RECEIVER] Received {} samples", count); - let mut buf = returned_buf; - while let Some(sample) = buf.pop_front() { - println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); - } - buf - } else { - returned_buf } } - Err(e) => { - println!("[RECEIVER] Error receiving data: {:?}", e); - SampleContainer::new(5) + let mut buf = returned_buf; + while let Some(sample) = buf.pop_front() { + println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); } + buf } } } diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index ac1862c46..35134c17a 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -828,8 +828,10 @@ pub trait Subscription { /// buffer and transferred to the container /// /// # Returns - /// Future that resolves to the number of newly added events to the container with at least - /// `new_samples` number of new events. + /// Future that resolves to `(SampleContainer>, Result<()>)`. + /// The container is **always** returned (even on error or cancellation) so the caller + /// never loses samples that were collected before the future resolved. + /// `Ok(())` means at least `new_samples` were received; `Err(_)` describes the failure. /// /// # Important Notes /// User can not concurrenly call `receive` on the same subscription instance from @@ -849,7 +851,7 @@ pub trait Subscription { scratch: SampleContainer>, new_samples: usize, max_samples: usize, - ) -> impl Future>>> + 'a; + ) -> impl Future>, Result<()>)> + 'a; } /// A trait for types that can be default-constructed in place, skipping intermediate moves. diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index 73c8f90f5..fa0c816bd 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -543,15 +543,18 @@ where scratch: SampleContainer>, new_samples: usize, max_samples: usize, - ) -> impl Future>>> + 'a { + ) -> impl Future>, Result<()>)> + 'a { async move { if max_samples > self.max_num_samples || new_samples > self.max_num_samples { - return Err(Error::ReceiveError( - ReceiveFailedReason::SampleCountOutOfBounds { - max: self.max_num_samples, - requested: max_samples.max(new_samples), - }, - )); + return ( + scratch, + Err(Error::ReceiveError( + ReceiveFailedReason::SampleCountOutOfBounds { + max: self.max_num_samples, + requested: max_samples.max(new_samples), + }, + )), + ); } // Get the event guard to ensure no concurrent receive calls // on the same subscriber instance. @@ -592,7 +595,7 @@ struct ReceiveFuture<'a, T: CommData + Debug> { } impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { - type Output = Result>>; + type Output = (SampleContainer>, Result<()>); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { // Extract all immutable values upfront to avoid borrow conflicts with self in the callback @@ -617,6 +620,7 @@ impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { self.scratch = Some(scratch); result } else { + self.scratch = Some(scratch); Err(Error::ReceiveError(ReceiveFailedReason::ReceiveError)) } } else { @@ -632,15 +636,22 @@ impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { //event_guard will be dropped here, allowing new receive calls to access the // proxy event self.event_guard = None; - return Poll::Ready(Ok(self - .scratch - .take() - .expect("SampleContainer is not available when returning Future result"))); + return Poll::Ready(( + self.scratch + .take() + .expect("SampleContainer is not available when returning Future result"), + Ok(()), + )); } // Have some samples but not enough yet, wait for more via waker Poll::Pending } - Err(e) => Poll::Ready(Err(e)), + Err(e) => Poll::Ready(( + self.scratch + .take() + .expect("SampleContainer unavailable on error; was receive polled after completion?"), + Err(e), + )), } } } diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index d099256f3..f52e20d33 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -333,7 +333,7 @@ where _scratch: SampleContainer>, _new_samples: usize, _max_samples: usize, - ) -> impl Future>>> + 'a { + ) -> impl Future>, Result<()>)> + 'a { async { todo!() } } } @@ -531,16 +531,10 @@ mod test { let test_subscriber = super::SubscriberImpl::::new(); // block on an asynchronous reception of data from test_subscriber futures::executor::block_on(async { - let mut sample_buf = SampleContainer::new(); - match test_subscriber.receive(&mut sample_buf, 1, 1).await { - Ok(0) => panic!("No sample received"), - Ok(x) => { - println!( - "{} samples received: sample[0] = {}", - x, - *sample_buf.front().unwrap() - ) - } + let sample_buf = SampleContainer::new(1); + let (_returned_buf, result) = test_subscriber.receive(sample_buf, 1, 1).await; + match result { + Ok(()) => {} Err(e) => panic!("{:?}", e), } }) diff --git a/score/mw/com/impl/rust/com-api/com-api/com_api.rs b/score/mw/com/impl/rust/com-api/com-api/com_api.rs index ba093a015..4be3e66f1 100644 --- a/score/mw/com/impl/rust/com-api/com-api/com_api.rs +++ b/score/mw/com/impl/rust/com-api/com-api/com_api.rs @@ -114,7 +114,8 @@ //! } //! //! // 6. Or wait asynchronously for events -//! let n = subscription.receive(&mut container, 1, 3).await?; +//! let (container, result) = subscription.receive(container, 1, 3).await; +//! result?; //! ``` //! # Further reading //! - `com_api_concept` crate — trait definitions and full API documentation diff --git a/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs b/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs index 309e926ca..9c787140b 100644 --- a/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs +++ b/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs @@ -89,33 +89,27 @@ async fn async_main() { let mut sample_buf = SampleContainer::new(MAX_SAMPLES_PER_CALL); // `receive` awaits until at least `min_samples` (1) are available. while received_total < num_cycles { - sample_buf = match subscription + let (returned_buf, result) = subscription .receive(sample_buf, 1, MAX_SAMPLES_PER_CALL) - .await - { - Ok(returned_buf) => { - let count = returned_buf.sample_count(); - if count > 0 { - let mut buf = returned_buf; - for _ in 0..count { - if let Some(sample) = buf.pop_front() { - println!("[bigdata-consumer] Received sample x={}", sample.x); - } - } - received_total += count; - println!( - "[bigdata-consumer] Progress: {}/{}", - received_total, num_cycles - ); - buf - } else { - returned_buf + .await; + if let Err(e) = result { + eprintln!("[bigdata-consumer] Receive error: {:?}", e); + return; + } + sample_buf = { + let count = returned_buf.sample_count(); + let mut buf = returned_buf; + for _ in 0..count { + if let Some(sample) = buf.pop_front() { + println!("[bigdata-consumer] Received sample x={}", sample.x); } } - Err(e) => { - eprintln!("[bigdata-consumer] Receive error: {:?}", e); - return; - } + received_total += count; + println!( + "[bigdata-consumer] Progress: {}/{}", + received_total, num_cycles + ); + buf }; } From 8f5b1c72583707d56761a2583dab6ea0172d216d Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Wed, 29 Apr 2026 13:46:01 +0200 Subject: [PATCH 2/7] Rust::com Added async receive API with timeout parameter * Added async receive API with timepout parameter --- .../com-api-concept/com_api_concept.rs | 8 ++++ .../com-api/com-api-runtime-lola/consumer.rs | 37 ++++++++++++++----- .../com-api/com-api-runtime-mock/runtime.rs | 11 ++++++ 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index 35134c17a..258574352 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -852,6 +852,14 @@ pub trait Subscription { new_samples: usize, max_samples: usize, ) -> impl Future>, Result<()>)> + 'a; + + fn receive_with_timeout<'a>( + &'a self, + scratch: SampleContainer>, + new_samples: usize, + max_samples: usize, + timeout_future: impl Future + 'a, + ) -> impl Future>, Result<()>)> + 'a; } /// A trait for types that can be default-constructed in place, skipping intermediate moves. diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index fa0c816bd..b6fbc15ae 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -30,7 +30,7 @@ #![allow(clippy::needless_lifetimes)] use crate::Debug; -use core::future::Future; +use core::future::{self, Future}; use core::marker::PhantomData; use core::mem::ManuallyDrop; use core::ops::{Deref, DerefMut}; @@ -258,8 +258,9 @@ impl NativeProxyEventBase { pub fn new(proxy: &NonNull, interface_id: &str, identifier: &str) -> Result { //SAFETY: It is safe as we are passing valid proxy pointer and interface id to get event // proxy pointer is created during consumer creation - let raw_event_ptr = - unsafe { bridge_ffi_rs::get_event_from_proxy(proxy.as_ptr(), interface_id, identifier) }; + let raw_event_ptr = unsafe { + bridge_ffi_rs::get_event_from_proxy(proxy.as_ptr(), interface_id, identifier) + }; let proxy_event_ptr = std::ptr::NonNull::new(raw_event_ptr) .ok_or(Error::EventError(EventFailedReason::EventCreationFailed))?; Ok(Self { proxy_event_ptr }) @@ -535,14 +536,30 @@ where ) } + #[allow(clippy::manual_async_fn)] + fn receive<'a>( + &'a self, + scratch: SampleContainer>, + new_samples: usize, + max_samples: usize, + ) -> impl Future>, Result<()>)> + 'a { + self.receive_with_timeout( + scratch, + new_samples, + max_samples, + core::future::pending::<()>(), + ) + } + // Cannot use `async fn` because the trait mandates `-> impl Future + 'a`, // requiring the returned future to be explicitly bound to the lifetime of `&self`. #[allow(clippy::manual_async_fn)] - fn receive<'a>( + fn receive_with_timeout<'a>( &'a self, scratch: SampleContainer>, new_samples: usize, max_samples: usize, + timeout_future: impl Future + 'a, ) -> impl Future>, Result<()>)> + 'a { async move { if max_samples > self.max_num_samples || new_samples > self.max_num_samples { @@ -637,9 +654,9 @@ impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { // proxy event self.event_guard = None; return Poll::Ready(( - self.scratch - .take() - .expect("SampleContainer is not available when returning Future result"), + self.scratch.take().expect( + "SampleContainer is not available when returning Future result", + ), Ok(()), )); } @@ -647,9 +664,9 @@ impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { Poll::Pending } Err(e) => Poll::Ready(( - self.scratch - .take() - .expect("SampleContainer unavailable on error; was receive polled after completion?"), + self.scratch.take().expect( + "SampleContainer unavailable on error; was receive polled after completion?", + ), Err(e), )), } diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index f52e20d33..a0a86d962 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -336,6 +336,17 @@ where ) -> impl Future>, Result<()>)> + 'a { async { todo!() } } + + #[allow(clippy::manual_async_fn)] + fn receive_with_timeout<'a>( + &'a self, + _scratch: SampleContainer>, + _new_samples: usize, + _max_samples: usize, + _timeout_future: impl Future + 'a, + ) -> impl Future>, Result<()>)> + 'a { + async { todo!() } + } } pub struct Publisher { From e6d08ee6dc31b482a5e7a78d456aa8b7af53f14f Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Thu, 30 Apr 2026 06:30:22 +0200 Subject: [PATCH 3/7] Rust::com Implementation for async_receive_with_timeout * Added implementation for receive timeout api --- .../com-api-concept/com_api_concept.rs | 2 +- .../rust/com-api/com-api-concept/error.rs | 2 ++ .../com-api/com-api-runtime-lola/consumer.rs | 19 ++++++++++++++++--- .../com-api/com-api-runtime-mock/runtime.rs | 2 +- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index 258574352..5a9d66273 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -858,7 +858,7 @@ pub trait Subscription { scratch: SampleContainer>, new_samples: usize, max_samples: usize, - timeout_future: impl Future + 'a, + timeout_future: impl Future + Unpin + 'a, ) -> impl Future>, Result<()>)> + 'a; } diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs index ef7c2fd93..97acc5f29 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs @@ -73,6 +73,8 @@ pub enum ReceiveFailedReason { BufferUnavailable, #[error("Sample size out of bounds, expected at most {max}, but got {requested}")] SampleCountOutOfBounds { max: usize, requested: usize }, + #[error("Receive operation was cancelled or timed out")] + Cancelled, } /// Comprehensive error reasons for event-related failures diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index b6fbc15ae..1e774ae96 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -559,7 +559,7 @@ where scratch: SampleContainer>, new_samples: usize, max_samples: usize, - timeout_future: impl Future + 'a, + timeout_future: impl Future + Unpin + 'a, ) -> impl Future>, Result<()>)> + 'a { async move { if max_samples > self.max_num_samples || new_samples > self.max_num_samples { @@ -590,6 +590,7 @@ where new_samples, max_samples, total_received: 0, + timeout_future, } .await } @@ -601,7 +602,7 @@ where // a waker storage for async notifications, and parameters for managing the receive operation. // The Future implementation for ReceiveFuture defines the polling logic, // which attempts to receive samples and manages the state of the receive operation. -struct ReceiveFuture<'a, T: CommData + Debug> { +struct ReceiveFuture<'a, T: CommData + Debug, C: Future + Unpin> { event_guard: Option>, waker_storage: Arc, max_num_samples: usize, @@ -609,9 +610,10 @@ struct ReceiveFuture<'a, T: CommData + Debug> { new_samples: usize, max_samples: usize, total_received: usize, + timeout_future: C, } -impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { +impl<'a, T: CommData + Debug, C: Future + Unpin> Future for ReceiveFuture<'a, T, C> { type Output = (SampleContainer>, Result<()>); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { @@ -621,6 +623,17 @@ impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { let max_num_samples = self.max_num_samples; let total_received = self.total_received; + // Poll cancel/timeout future first — C: Unpin so Pin::new is safe, no allocation + if Pin::new(&mut self.timeout_future).poll(ctx).is_ready() { + self.event_guard = None; + return Poll::Ready(( + self.scratch + .take() + .expect("SampleContainer missing on timeout/cancel"), + Err(Error::ReceiveError(ReceiveFailedReason::Cancelled)), + )); + } + // Register the current waker to be notified when new samples arrive via FFI callback self.waker_storage.register(ctx.waker()); diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index a0a86d962..8662c304e 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -343,7 +343,7 @@ where _scratch: SampleContainer>, _new_samples: usize, _max_samples: usize, - _timeout_future: impl Future + 'a, + _timeout_future: impl Future + Unpin + 'a, ) -> impl Future>, Result<()>)> + 'a { async { todo!() } } From 13f8fdb44dce731257e08067807a78c0d5915d5b Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Thu, 30 Apr 2026 13:14:30 +0200 Subject: [PATCH 4/7] Rust::com Example file update for receive timeout * Updated example test file --- .../basic-consumer-producer.rs | 83 ++++++++++++++----- .../com-api-concept/com_api_concept.rs | 2 +- .../rust/com-api/com-api-concept/error.rs | 2 + .../com-api/com-api-runtime-lola/consumer.rs | 17 +++- .../com-api/com-api-runtime-mock/runtime.rs | 5 +- 5 files changed, 83 insertions(+), 26 deletions(-) diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs index 001993776..58e4f7102 100644 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ b/score/mw/com/example/com-api-example/basic-consumer-producer.rs @@ -368,7 +368,7 @@ mod test { let uninit_sample = match offered_producer.left_tire.allocate() { Ok(sample) => sample, Err(e) => { - eprintln!("Failed to allocate sample: {:?}", e); + eprintln!("[SENDER] Failed to allocate sample: {:?}", e); continue; } }; @@ -377,9 +377,9 @@ mod test { }); match sample.send() { Ok(_) => (), - Err(e) => eprintln!("Failed to send sample: {:?}", e), + Err(e) => eprintln!("[SENDER] Failed to send sample: {:?}", e), } - println!("Sent sample with pressure: {}", 1.0 + i as f32); + println!("[SENDER] Sent sample with pressure: {}", 1.0 + i as f32); tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; } offered_producer @@ -387,26 +387,32 @@ mod test { //receiver function which use async receive to get data, it waits for new data and process it once it arrives, //it will receive data 10 times and print the received samples - async fn async_data_processor_fn(subscribed: impl Subscription) { + async fn async_data_processor_fn( + subscribed: impl Subscription + Send + 'static, + is_timeout: bool, + ) { println!("[RECEIVER] Async data processor started"); let mut buffer = SampleContainer::new(5); for _ in 0..5 { - let (returned_buf, result) = subscribed.receive(buffer, 2, 3).await; - buffer = { - if let Err(e) = result { - println!("[RECEIVER] Error receiving data: {:?}", e); - } else { - let count = returned_buf.sample_count(); - if count > 0 { - println!("[RECEIVER] Received {} samples", count); - } - } - let mut buf = returned_buf; - while let Some(sample) = buf.pop_front() { - println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); + let (returned_buf, result) = if is_timeout { + let timeout = tokio::time::sleep(Duration::from_millis(1000)); + subscribed.receive_with_timeout(buffer, 2, 3, timeout).await + } else { + subscribed.receive(buffer, 2, 3).await + }; + if let Err(e) = result { + println!("[RECEIVER] Error receiving data: {:?}", e); + } else { + let count = returned_buf.sample_count(); + if count > 0 { + println!("[RECEIVER] Received {} samples", count); } - buf } + let mut buf = returned_buf; + while let Some(sample) = buf.pop_front() { + println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); + } + buffer = buf; } } @@ -433,8 +439,45 @@ mod test { let consumer = consumer.await.expect("Failed to create consumer"); // Subscribe to one event let subscribed = consumer.left_tire.subscribe(5).unwrap(); - // Spawn async data processor - let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed)); + + let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed, false)); + processor_join_handle + .await + .expect("Error returned from task"); + let producer = sender_join_handle.await.expect("Error returned from task"); + + match producer.unoffer() { + Ok(_) => println!("Successfully unoffered the service"), + Err(e) => eprintln!("Failed to unoffer: {:?}", e), + } + + println!("=== Async subscription test with Lola runtime completed ===\n"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn receive_with_timeout_and_send_using_multi_thread() { + println!("Starting async subscription test with Lola runtime"); + //Intentionally using service instance of test1, if you face issue add new service instance in config file and use it here. + let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") + .expect("Failed to create InstanceSpecifier"); + let service_id_clone = service_id.clone(); + //consumer create + let consumer_runtime = get_test_runtime(); + //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result + let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); + //simulate some delay before producer offer service, so that consumer is waiting for discovery + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + //Producer create + let producer_runtime = get_test_runtime(); + let producer = create_producer(producer_runtime, service_id_clone); + // Spawn async data sender + let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); + // Await consumer creation and subscribe to events + let consumer = consumer.await.expect("Failed to create consumer"); + // Subscribe to one event + let subscribed = consumer.left_tire.subscribe(5).unwrap(); + + let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed, true)); processor_join_handle .await .expect("Error returned from task"); diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index 5a9d66273..570606541 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -858,7 +858,7 @@ pub trait Subscription { scratch: SampleContainer>, new_samples: usize, max_samples: usize, - timeout_future: impl Future + Unpin + 'a, + timeout_future: impl Future + Send + 'static, ) -> impl Future>, Result<()>)> + 'a; } diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs index 97acc5f29..95d680626 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs @@ -75,6 +75,8 @@ pub enum ReceiveFailedReason { SampleCountOutOfBounds { max: usize, requested: usize }, #[error("Receive operation was cancelled or timed out")] Cancelled, + #[error("Input value out of bounds, maximum sample {max}, but new sample is {requested}")] + InputValueOutOfBounds { max: usize, requested: usize }, } /// Comprehensive error reasons for event-related failures diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index 1e774ae96..05f1c90ab 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -30,7 +30,7 @@ #![allow(clippy::needless_lifetimes)] use crate::Debug; -use core::future::{self, Future}; +use core::future::Future; use core::marker::PhantomData; use core::mem::ManuallyDrop; use core::ops::{Deref, DerefMut}; @@ -559,9 +559,20 @@ where scratch: SampleContainer>, new_samples: usize, max_samples: usize, - timeout_future: impl Future + Unpin + 'a, + timeout_future: impl Future + Send + 'static, ) -> impl Future>, Result<()>)> + 'a { async move { + if new_samples > max_samples { + return ( + scratch, + Err(Error::ReceiveError( + ReceiveFailedReason::InputValueOutOfBounds { + max: max_samples, + requested: new_samples, + }, + )), + ); + } if max_samples > self.max_num_samples || new_samples > self.max_num_samples { return ( scratch, @@ -590,7 +601,7 @@ where new_samples, max_samples, total_received: 0, - timeout_future, + timeout_future: core::pin::pin!(timeout_future), } .await } diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index 8662c304e..844b210d2 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -343,8 +343,9 @@ where _scratch: SampleContainer>, _new_samples: usize, _max_samples: usize, - _timeout_future: impl Future + Unpin + 'a, - ) -> impl Future>, Result<()>)> + 'a { + _timeout_future: impl Future + Send + 'static, + ) -> impl Future>, Result<()>)> + 'a + { async { todo!() } } } From 8dae085f64639ac2b9244a25366500e666b73684 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Mon, 4 May 2026 07:46:18 +0200 Subject: [PATCH 5/7] Rust::com Async Receive API with timeout * Added pre validation branch * Updated API documentation --- .../basic-consumer-producer.rs | 2 +- .../com-api-concept/com_api_concept.rs | 35 ++++++- .../com-api/com-api-runtime-lola/consumer.rs | 97 ++++++++++++------- .../com-api/com-api-runtime-mock/runtime.rs | 5 +- 4 files changed, 97 insertions(+), 42 deletions(-) diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs index 58e4f7102..07bcaa352 100644 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ b/score/mw/com/example/com-api-example/basic-consumer-producer.rs @@ -388,7 +388,7 @@ mod test { //receiver function which use async receive to get data, it waits for new data and process it once it arrives, //it will receive data 10 times and print the received samples async fn async_data_processor_fn( - subscribed: impl Subscription + Send + 'static, + subscribed: impl Subscription, is_timeout: bool, ) { println!("[RECEIVER] Async data processor started"); diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index 570606541..f52a9f807 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -800,7 +800,7 @@ pub trait Subscription { /// # Parameters /// * `scratch` - Container for events from this subscription; must not be reused across /// different subscriptions - /// * `max_samples` - Maximum number of events to transfer + /// * `max_samples` - Maximum number of events to transfer, 0 value is treated as error /// /// # Returns /// @@ -823,9 +823,9 @@ pub trait Subscription { /// TODO: See above for C++ limitations. /// # Parameters /// * `scratch` - Container for events from this subscription - /// * `new_samples` - Minimum number of new events before resolution + /// * `new_samples` - Minimum number of new events before resolution, 0 value is treated as error /// * `max_samples` - Maximum number of events that shall be received from the communication - /// buffer and transferred to the container + /// buffer and transferred to the container, 0 value is treated as error /// /// # Returns /// Future that resolves to `(SampleContainer>, Result<()>)`. @@ -853,12 +853,39 @@ pub trait Subscription { max_samples: usize, ) -> impl Future>, Result<()>)> + 'a; + /// This method is extension of `receive` with timeout support. + /// It returns a future that resolves as soon as at least `new_samples` samples have been transferred + /// from the communication buffer to the sample container or the `timeout` future resolves. + /// + /// # Parameters + /// * `scratch` - Container for events from this subscription + /// * `new_samples` - Minimum number of new events before resolution, 0 value is treated as error + /// * `max_samples` - Maximum number of events that shall be received from the communication + /// buffer and transferred to the container, 0 value is treated as error + /// * `timeout` - Future that resolves when the receive operation should time out. If the timeout + /// future resolves before the required number of samples are received, the receive operation is + /// considered to have timed out. + /// + /// # Returns + /// Future that resolves to `(SampleContainer>, Result<()>)`. + /// The container is **always** returned (even on error, timeout, or cancellation) so the caller + /// never loses samples that were collected before the future resolved. + /// Error is returned if the receive operation fails or if the timeout future resolves before the required + /// number of samples are received. + /// + /// # Important Notes + /// User can not concurrenly call `receive_with_timeout` on the same subscription instance from + /// multiple threads or tasks. + /// `timeout` must be `'static` because timeout futures (e.g. `tokio::time::sleep`) own all + /// their state and do not borrow anything from the caller's scope. + /// And if you change to `'a` lifetime then it create lifetime bound not satisfied + /// error from rust (see issue for more information) fn receive_with_timeout<'a>( &'a self, scratch: SampleContainer>, new_samples: usize, max_samples: usize, - timeout_future: impl Future + Send + 'static, + timeout: impl Future + Send + 'static, ) -> impl Future>, Result<()>)> + 'a; } diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index 05f1c90ab..a59f60884 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -553,36 +553,52 @@ where // Cannot use `async fn` because the trait mandates `-> impl Future + 'a`, // requiring the returned future to be explicitly bound to the lifetime of `&self`. + // we do not need to cancle timeout future when receive future resolved, + // because ReceiveFuture drop will take care of cancelling the timeout future. #[allow(clippy::manual_async_fn)] fn receive_with_timeout<'a>( &'a self, scratch: SampleContainer>, new_samples: usize, max_samples: usize, - timeout_future: impl Future + Send + 'static, + timeout: impl Future + Send + 'static, ) -> impl Future>, Result<()>)> + 'a { async move { - if new_samples > max_samples { - return ( - scratch, - Err(Error::ReceiveError( - ReceiveFailedReason::InputValueOutOfBounds { - max: max_samples, - requested: new_samples, - }, - )), - ); - } - if max_samples > self.max_num_samples || new_samples > self.max_num_samples { - return ( - scratch, - Err(Error::ReceiveError( - ReceiveFailedReason::SampleCountOutOfBounds { - max: self.max_num_samples, - requested: max_samples.max(new_samples), - }, - )), - ); + match new_samples { + 0 => { + return ( + scratch, + Err(Error::ReceiveError( + ReceiveFailedReason::InputValueOutOfBounds { + max: self.max_num_samples, + requested: 0, + }, + )), + ) + } + _ if new_samples > max_samples => { + return ( + scratch, + Err(Error::ReceiveError( + ReceiveFailedReason::InputValueOutOfBounds { + max: max_samples, + requested: new_samples, + }, + )), + ); + } + _ if max_samples > self.max_num_samples || new_samples > self.max_num_samples => { + return ( + scratch, + Err(Error::ReceiveError( + ReceiveFailedReason::InputValueOutOfBounds { + max: self.max_num_samples, + requested: max_samples.max(new_samples), + }, + )), + ); + } + _ => {} } // Get the event guard to ensure no concurrent receive calls // on the same subscriber instance. @@ -601,7 +617,7 @@ where new_samples, max_samples, total_received: 0, - timeout_future: core::pin::pin!(timeout_future), + timeout: core::pin::pin!(timeout), } .await } @@ -613,7 +629,9 @@ where // a waker storage for async notifications, and parameters for managing the receive operation. // The Future implementation for ReceiveFuture defines the polling logic, // which attempts to receive samples and manages the state of the receive operation. -struct ReceiveFuture<'a, T: CommData + Debug, C: Future + Unpin> { +// Unpin bound is required for the timeout future to allow it to be safely pinned on the stack without +// heap allocation. +struct ReceiveFuture<'a, T: CommData + Debug, F: Future + Unpin> { event_guard: Option>, waker_storage: Arc, max_num_samples: usize, @@ -621,10 +639,10 @@ struct ReceiveFuture<'a, T: CommData + Debug, C: Future + Unpin> { new_samples: usize, max_samples: usize, total_received: usize, - timeout_future: C, + timeout: F, } -impl<'a, T: CommData + Debug, C: Future + Unpin> Future for ReceiveFuture<'a, T, C> { +impl<'a, T: CommData + Debug, F: Future + Unpin> Future for ReceiveFuture<'a, T, F> { type Output = (SampleContainer>, Result<()>); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { @@ -634,8 +652,8 @@ impl<'a, T: CommData + Debug, C: Future + Unpin> Future for Receive let max_num_samples = self.max_num_samples; let total_received = self.total_received; - // Poll cancel/timeout future first — C: Unpin so Pin::new is safe, no allocation - if Pin::new(&mut self.timeout_future).poll(ctx).is_ready() { + // Poll cancel/timeout future first — F: Unpin so Pin::new is safe, no allocation + if Pin::new(&mut self.timeout).poll(ctx).is_ready() { self.event_guard = None; return Poll::Ready(( self.scratch @@ -687,12 +705,15 @@ impl<'a, T: CommData + Debug, C: Future + Unpin> Future for Receive // Have some samples but not enough yet, wait for more via waker Poll::Pending } - Err(e) => Poll::Ready(( - self.scratch.take().expect( - "SampleContainer unavailable on error; was receive polled after completion?", - ), - Err(e), - )), + Err(e) => { + self.event_guard = None; + Poll::Ready(( + self.scratch.take().expect( + "SampleContainer unavailable on error; was receive polled after completion?", + ), + Err(e), + )) + } } } } @@ -973,6 +994,14 @@ fn try_receive_samples( max_num_samples: usize, max_samples: usize, ) -> Result { + if max_samples == 0 { + return Err(Error::ReceiveError( + ReceiveFailedReason::InputValueOutOfBounds { + max: max_num_samples, + requested: 0, + }, + )); + } if max_samples > max_num_samples { return Err(Error::ReceiveError( ReceiveFailedReason::SampleCountOutOfBounds { diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index 844b210d2..3aa918dee 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -343,9 +343,8 @@ where _scratch: SampleContainer>, _new_samples: usize, _max_samples: usize, - _timeout_future: impl Future + Send + 'static, - ) -> impl Future>, Result<()>)> + 'a - { + _timeout: impl Future + Send + 'static, + ) -> impl Future>, Result<()>)> + 'a { async { todo!() } } } From 612c43ca96284dfb963c62883c4662634a6bc3f7 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Thu, 7 May 2026 14:47:13 +0200 Subject: [PATCH 6/7] Rust::com added doc test for Receive timeout API * API can support pin or unpin both kind of future --- .../com-api/com-api-runtime-lola/consumer.rs | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index a59f60884..acab609b3 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -555,6 +555,38 @@ where // requiring the returned future to be explicitly bound to the lifetime of `&self`. // we do not need to cancle timeout future when receive future resolved, // because ReceiveFuture drop will take care of cancelling the timeout future. + // + /// Demonstrates passing a struct future that is `Unpin` via a `PhantomData` field + /// to `receive_with_timeout`. + /// + /// A struct with only a `PhantomData` field has no self-referential state, so + /// the compiler derives `Unpin` automatically. + /// + /// ``` + /// use com_api_concept::{CommData, SampleContainer, Subscription}; + /// use com_api_runtime_lola::LolaRuntimeImpl; + /// use core::marker::PhantomData; + /// + /// struct UnpinTimeout(PhantomData); + /// + /// impl std::future::Future for UnpinTimeout { + /// type Output = (); + /// fn poll( + /// self: std::pin::Pin<&mut Self>, + /// _cx: &mut std::task::Context<'_>, + /// ) -> std::task::Poll<()> { + /// std::task::Poll::Ready(()) + /// } + /// } + /// + /// fn demonstrate<'a, T, S>(sub: &'a S, scratch: SampleContainer>) + /// where + /// T: CommData + std::fmt::Debug, + /// S: Subscription, + /// { + /// let _future = sub.receive_with_timeout(scratch, 1, 1, UnpinTimeout::(PhantomData)); + /// } + /// ``` #[allow(clippy::manual_async_fn)] fn receive_with_timeout<'a>( &'a self, From 7cf95919f3bfa64afe4ef9e0b54b7c46067af35f Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Fri, 22 May 2026 13:22:55 +0530 Subject: [PATCH 7/7] Rust::com Updated Receive timeout API * Renamed the API * Moved receive default API at trait level * Updated Return value to usize --- .../basic-consumer-producer.rs | 12 ++---- .../com-api-concept/com_api_concept.rs | 41 ++++++++++++------- .../com-api/com-api-runtime-lola/consumer.rs | 25 +++-------- .../com-api/com-api-runtime-mock/runtime.rs | 14 +------ .../consumer_async_apis/consumer_app.rs | 4 ++ 5 files changed, 42 insertions(+), 54 deletions(-) diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs index 07bcaa352..402d2551d 100644 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ b/score/mw/com/example/com-api-example/basic-consumer-producer.rs @@ -396,17 +396,13 @@ mod test { for _ in 0..5 { let (returned_buf, result) = if is_timeout { let timeout = tokio::time::sleep(Duration::from_millis(1000)); - subscribed.receive_with_timeout(buffer, 2, 3, timeout).await + subscribed.receive_timeout(buffer, 2, 3, timeout).await } else { subscribed.receive(buffer, 2, 3).await }; - if let Err(e) = result { - println!("[RECEIVER] Error receiving data: {:?}", e); - } else { - let count = returned_buf.sample_count(); - if count > 0 { - println!("[RECEIVER] Received {} samples", count); - } + match result { + Ok(count) => println!("[RECEIVER] Received {} samples", count), + Err(e) => eprintln!("[RECEIVER] Failed to receive samples: {:?}", e), } let mut buf = returned_buf; while let Some(sample) = buf.pop_front() { diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index f52a9f807..e1c5dd25f 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -232,7 +232,9 @@ impl InstanceSpecifier { } // Remove the single leading slash - let service_name = service_name.strip_prefix('/').unwrap(); + let Some(service_name) = service_name.strip_prefix('/') else { + return false; + }; // Check each character // Allowed: digits, lowercase, uppercase, underscore @@ -828,10 +830,12 @@ pub trait Subscription { /// buffer and transferred to the container, 0 value is treated as error /// /// # Returns - /// Future that resolves to `(SampleContainer>, Result<()>)`. - /// The container is **always** returned (even on error or cancellation) so the caller - /// never loses samples that were collected before the future resolved. - /// `Ok(())` means at least `new_samples` were received; `Err(_)` describes the failure. + /// Future that resolves to `(SampleContainer>, Result)`. + /// SampleContainer is **always** returned (even on error or cancellation) + /// Success case return contains the number of newly added samples to the container with + /// number of samples. + /// Failure case return the SampleContainer which user passed to the function and + /// an 'Error' indicating the reason for failure of the receive operation. /// /// # Important Notes /// User can not concurrenly call `receive` on the same subscription instance from @@ -846,12 +850,20 @@ pub trait Subscription { // The `Future` cannot have a `'static` lifetime. If we enforced `'static`, then `self` would // also need to be `'static`, which is not semantically correct for this use case. // Multiple threads cannot concurrently read the same event from a single subscription. + #[allow(clippy::manual_async_fn)] fn receive<'a>( &'a self, scratch: SampleContainer>, new_samples: usize, max_samples: usize, - ) -> impl Future>, Result<()>)> + 'a; + ) -> impl Future>, Result)> + 'a { + self.receive_timeout( + scratch, + new_samples, + max_samples, + core::future::pending::<()>(), + ) + } /// This method is extension of `receive` with timeout support. /// It returns a future that resolves as soon as at least `new_samples` samples have been transferred @@ -867,26 +879,27 @@ pub trait Subscription { /// considered to have timed out. /// /// # Returns - /// Future that resolves to `(SampleContainer>, Result<()>)`. - /// The container is **always** returned (even on error, timeout, or cancellation) so the caller - /// never loses samples that were collected before the future resolved. - /// Error is returned if the receive operation fails or if the timeout future resolves before the required - /// number of samples are received. + /// Future that resolves to `(SampleContainer>, Result)`. + /// SampleContainer is **always** returned (even on error or cancellation) + /// Success case return contains the number of newly added samples to the container with + /// number of samples. + /// Failure case return the SampleContainer which user passed to the function and + /// an 'Error' indicating the reason for failure of the receive operation. /// /// # Important Notes - /// User can not concurrenly call `receive_with_timeout` on the same subscription instance from + /// User can not concurrenly call `receive_timeout` on the same subscription instance from /// multiple threads or tasks. /// `timeout` must be `'static` because timeout futures (e.g. `tokio::time::sleep`) own all /// their state and do not borrow anything from the caller's scope. /// And if you change to `'a` lifetime then it create lifetime bound not satisfied /// error from rust (see issue for more information) - fn receive_with_timeout<'a>( + fn receive_timeout<'a>( &'a self, scratch: SampleContainer>, new_samples: usize, max_samples: usize, timeout: impl Future + Send + 'static, - ) -> impl Future>, Result<()>)> + 'a; + ) -> impl Future>, Result)> + 'a; } /// A trait for types that can be default-constructed in place, skipping intermediate moves. diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index acab609b3..e20745b99 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -536,21 +536,6 @@ where ) } - #[allow(clippy::manual_async_fn)] - fn receive<'a>( - &'a self, - scratch: SampleContainer>, - new_samples: usize, - max_samples: usize, - ) -> impl Future>, Result<()>)> + 'a { - self.receive_with_timeout( - scratch, - new_samples, - max_samples, - core::future::pending::<()>(), - ) - } - // Cannot use `async fn` because the trait mandates `-> impl Future + 'a`, // requiring the returned future to be explicitly bound to the lifetime of `&self`. // we do not need to cancle timeout future when receive future resolved, @@ -584,17 +569,17 @@ where /// T: CommData + std::fmt::Debug, /// S: Subscription, /// { - /// let _future = sub.receive_with_timeout(scratch, 1, 1, UnpinTimeout::(PhantomData)); + /// let _future = sub.receive_timeout(scratch, 1, 1, UnpinTimeout::(PhantomData)); /// } /// ``` #[allow(clippy::manual_async_fn)] - fn receive_with_timeout<'a>( + fn receive_timeout<'a>( &'a self, scratch: SampleContainer>, new_samples: usize, max_samples: usize, timeout: impl Future + Send + 'static, - ) -> impl Future>, Result<()>)> + 'a { + ) -> impl Future>, Result)> + 'a { async move { match new_samples { 0 => { @@ -675,7 +660,7 @@ struct ReceiveFuture<'a, T: CommData + Debug, F: Future + Unpin> { } impl<'a, T: CommData + Debug, F: Future + Unpin> Future for ReceiveFuture<'a, T, F> { - type Output = (SampleContainer>, Result<()>); + type Output = (SampleContainer>, Result); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { // Extract all immutable values upfront to avoid borrow conflicts with self in the callback @@ -731,7 +716,7 @@ impl<'a, T: CommData + Debug, F: Future + Unpin> Future for Receive self.scratch.take().expect( "SampleContainer is not available when returning Future result", ), - Ok(()), + Ok(self.total_received), )); } // Have some samples but not enough yet, wait for more via waker diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index 3aa918dee..d03ba43df 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -328,23 +328,13 @@ where } #[allow(clippy::manual_async_fn)] - fn receive<'a>( - &'a self, - _scratch: SampleContainer>, - _new_samples: usize, - _max_samples: usize, - ) -> impl Future>, Result<()>)> + 'a { - async { todo!() } - } - - #[allow(clippy::manual_async_fn)] - fn receive_with_timeout<'a>( + fn receive_timeout<'a>( &'a self, _scratch: SampleContainer>, _new_samples: usize, _max_samples: usize, _timeout: impl Future + Send + 'static, - ) -> impl Future>, Result<()>)> + 'a { + ) -> impl Future>, Result)> + 'a { async { todo!() } } } diff --git a/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs b/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs index 9c787140b..d6b60b6ab 100644 --- a/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs +++ b/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs @@ -96,6 +96,10 @@ async fn async_main() { eprintln!("[bigdata-consumer] Receive error: {:?}", e); return; } + // Process the received samples and prepare the buffer for the next call to `receive`. + // We are retuning the same buffer instance to `receive` to avoid unnecessary allocations. + // So emptying buffer means processing, and if user required it can pass with samples still in buffer + // and `receive` will add new samples to it until it reaches the max capacity of the buffer. sample_buf = { let count = returned_buf.sample_count(); let mut buf = returned_buf;