diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs index d867bab98..402d2551d 100644 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ b/score/mw/com/example/com-api-example/basic-consumer-producer.rs @@ -328,33 +328,25 @@ mod test { for attempt in 0..MAX_ATTEMPTS { println!("[RECEIVER] Attempt {}", attempt); - // Match and immediately reassign in all branches - sample_buf = match subscribed.receive(sample_buf, 1, 3).await { - Ok(returned_buf) => { - let count = returned_buf.sample_count(); - - if count > 0 { - total_received += count; - println!( - "[RECEIVER] Received {} samples (total: {})", - count, total_received - ); - - // Create a mutable version to pop from - let mut buf = returned_buf; - while let Some(sample) = buf.pop_front() { - println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); - } - buf - } else { - returned_buf - } - } - Err(e) => { + // Destructure tuple - container is always returned even on error + let (returned_buf, result) = subscribed.receive(sample_buf, 1, 3).await; + sample_buf = { + let count = returned_buf.sample_count(); + if let Err(e) = result { println!("[RECEIVER] Error on attempt {}: {:?}", attempt, e); - // Create a fresh buffer if there's an error - SampleContainer::new(5) + } else if count > 0 { + total_received += count; + println!( + "[RECEIVER] Received {} samples (total: {})", + count, total_received + ); } + // Drain printed samples + let mut buf = returned_buf; + while let Some(sample) = buf.pop_front() { + println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); + } + buf }; } @@ -376,7 +368,7 @@ mod test { let uninit_sample = match offered_producer.left_tire.allocate() { Ok(sample) => sample, Err(e) => { - eprintln!("Failed to allocate sample: {:?}", e); + eprintln!("[SENDER] Failed to allocate sample: {:?}", e); continue; } }; @@ -385,9 +377,9 @@ mod test { }); match sample.send() { Ok(_) => (), - Err(e) => eprintln!("Failed to send sample: {:?}", e), + Err(e) => eprintln!("[SENDER] Failed to send sample: {:?}", e), } - println!("Sent sample with pressure: {}", 1.0 + i as f32); + println!("[SENDER] Sent sample with pressure: {}", 1.0 + i as f32); tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; } offered_producer @@ -395,29 +387,28 @@ mod test { //receiver function which use async receive to get data, it waits for new data and process it once it arrives, //it will receive data 10 times and print the received samples - async fn async_data_processor_fn(subscribed: impl Subscription) { + async fn async_data_processor_fn( + subscribed: impl Subscription, + is_timeout: bool, + ) { println!("[RECEIVER] Async data processor started"); let mut buffer = SampleContainer::new(5); for _ in 0..5 { - buffer = match subscribed.receive(buffer, 2, 3).await { - Ok(returned_buf) => { - let count = returned_buf.sample_count(); - if count > 0 { - println!("[RECEIVER] Received {} samples", count); - let mut buf = returned_buf; - while let Some(sample) = buf.pop_front() { - println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); - } - buf - } else { - returned_buf - } - } - Err(e) => { - println!("[RECEIVER] Error receiving data: {:?}", e); - SampleContainer::new(5) - } + let (returned_buf, result) = if is_timeout { + let timeout = tokio::time::sleep(Duration::from_millis(1000)); + subscribed.receive_timeout(buffer, 2, 3, timeout).await + } else { + subscribed.receive(buffer, 2, 3).await + }; + match result { + Ok(count) => println!("[RECEIVER] Received {} samples", count), + Err(e) => eprintln!("[RECEIVER] Failed to receive samples: {:?}", e), + } + let mut buf = returned_buf; + while let Some(sample) = buf.pop_front() { + println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); } + buffer = buf; } } @@ -444,8 +435,45 @@ mod test { let consumer = consumer.await.expect("Failed to create consumer"); // Subscribe to one event let subscribed = consumer.left_tire.subscribe(5).unwrap(); - // Spawn async data processor - let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed)); + + let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed, false)); + processor_join_handle + .await + .expect("Error returned from task"); + let producer = sender_join_handle.await.expect("Error returned from task"); + + match producer.unoffer() { + Ok(_) => println!("Successfully unoffered the service"), + Err(e) => eprintln!("Failed to unoffer: {:?}", e), + } + + println!("=== Async subscription test with Lola runtime completed ===\n"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn receive_with_timeout_and_send_using_multi_thread() { + println!("Starting async subscription test with Lola runtime"); + //Intentionally using service instance of test1, if you face issue add new service instance in config file and use it here. + let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") + .expect("Failed to create InstanceSpecifier"); + let service_id_clone = service_id.clone(); + //consumer create + let consumer_runtime = get_test_runtime(); + //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result + let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); + //simulate some delay before producer offer service, so that consumer is waiting for discovery + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + //Producer create + let producer_runtime = get_test_runtime(); + let producer = create_producer(producer_runtime, service_id_clone); + // Spawn async data sender + let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); + // Await consumer creation and subscribe to events + let consumer = consumer.await.expect("Failed to create consumer"); + // Subscribe to one event + let subscribed = consumer.left_tire.subscribe(5).unwrap(); + + let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed, true)); processor_join_handle .await .expect("Error returned from task"); diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index ac1862c46..e1c5dd25f 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -232,7 +232,9 @@ impl InstanceSpecifier { } // Remove the single leading slash - let service_name = service_name.strip_prefix('/').unwrap(); + let Some(service_name) = service_name.strip_prefix('/') else { + return false; + }; // Check each character // Allowed: digits, lowercase, uppercase, underscore @@ -800,7 +802,7 @@ pub trait Subscription { /// # Parameters /// * `scratch` - Container for events from this subscription; must not be reused across /// different subscriptions - /// * `max_samples` - Maximum number of events to transfer + /// * `max_samples` - Maximum number of events to transfer, 0 value is treated as error /// /// # Returns /// @@ -823,13 +825,17 @@ pub trait Subscription { /// TODO: See above for C++ limitations. /// # Parameters /// * `scratch` - Container for events from this subscription - /// * `new_samples` - Minimum number of new events before resolution + /// * `new_samples` - Minimum number of new events before resolution, 0 value is treated as error /// * `max_samples` - Maximum number of events that shall be received from the communication - /// buffer and transferred to the container + /// buffer and transferred to the container, 0 value is treated as error /// /// # Returns - /// Future that resolves to the number of newly added events to the container with at least - /// `new_samples` number of new events. + /// Future that resolves to `(SampleContainer>, Result)`. + /// SampleContainer is **always** returned (even on error or cancellation) + /// Success case return contains the number of newly added samples to the container with + /// number of samples. + /// Failure case return the SampleContainer which user passed to the function and + /// an 'Error' indicating the reason for failure of the receive operation. /// /// # Important Notes /// User can not concurrenly call `receive` on the same subscription instance from @@ -844,12 +850,56 @@ pub trait Subscription { // The `Future` cannot have a `'static` lifetime. If we enforced `'static`, then `self` would // also need to be `'static`, which is not semantically correct for this use case. // Multiple threads cannot concurrently read the same event from a single subscription. + #[allow(clippy::manual_async_fn)] fn receive<'a>( &'a self, scratch: SampleContainer>, new_samples: usize, max_samples: usize, - ) -> impl Future>>> + 'a; + ) -> impl Future>, Result)> + 'a { + self.receive_timeout( + scratch, + new_samples, + max_samples, + core::future::pending::<()>(), + ) + } + + /// This method is extension of `receive` with timeout support. + /// It returns a future that resolves as soon as at least `new_samples` samples have been transferred + /// from the communication buffer to the sample container or the `timeout` future resolves. + /// + /// # Parameters + /// * `scratch` - Container for events from this subscription + /// * `new_samples` - Minimum number of new events before resolution, 0 value is treated as error + /// * `max_samples` - Maximum number of events that shall be received from the communication + /// buffer and transferred to the container, 0 value is treated as error + /// * `timeout` - Future that resolves when the receive operation should time out. If the timeout + /// future resolves before the required number of samples are received, the receive operation is + /// considered to have timed out. + /// + /// # Returns + /// Future that resolves to `(SampleContainer>, Result)`. + /// SampleContainer is **always** returned (even on error or cancellation) + /// Success case return contains the number of newly added samples to the container with + /// number of samples. + /// Failure case return the SampleContainer which user passed to the function and + /// an 'Error' indicating the reason for failure of the receive operation. + /// + /// # Important Notes + /// User can not concurrenly call `receive_timeout` on the same subscription instance from + /// multiple threads or tasks. + /// `timeout` must be `'static` because timeout futures (e.g. `tokio::time::sleep`) own all + /// their state and do not borrow anything from the caller's scope. + /// And if you change to `'a` lifetime then it create lifetime bound not satisfied + /// error from rust (see issue for more information) + fn receive_timeout<'a>( + &'a self, + scratch: SampleContainer>, + new_samples: usize, + max_samples: usize, + timeout: impl Future + Send + 'static, + ) -> impl Future>, Result)> + 'a; } /// A trait for types that can be default-constructed in place, skipping intermediate moves. diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs index ef7c2fd93..95d680626 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs @@ -73,6 +73,10 @@ pub enum ReceiveFailedReason { BufferUnavailable, #[error("Sample size out of bounds, expected at most {max}, but got {requested}")] SampleCountOutOfBounds { max: usize, requested: usize }, + #[error("Receive operation was cancelled or timed out")] + Cancelled, + #[error("Input value out of bounds, maximum sample {max}, but new sample is {requested}")] + InputValueOutOfBounds { max: usize, requested: usize }, } /// Comprehensive error reasons for event-related failures diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index 73c8f90f5..e20745b99 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -258,8 +258,9 @@ impl NativeProxyEventBase { pub fn new(proxy: &NonNull, interface_id: &str, identifier: &str) -> Result { //SAFETY: It is safe as we are passing valid proxy pointer and interface id to get event // proxy pointer is created during consumer creation - let raw_event_ptr = - unsafe { bridge_ffi_rs::get_event_from_proxy(proxy.as_ptr(), interface_id, identifier) }; + let raw_event_ptr = unsafe { + bridge_ffi_rs::get_event_from_proxy(proxy.as_ptr(), interface_id, identifier) + }; let proxy_event_ptr = std::ptr::NonNull::new(raw_event_ptr) .ok_or(Error::EventError(EventFailedReason::EventCreationFailed))?; Ok(Self { proxy_event_ptr }) @@ -537,21 +538,84 @@ where // Cannot use `async fn` because the trait mandates `-> impl Future + 'a`, // requiring the returned future to be explicitly bound to the lifetime of `&self`. + // we do not need to cancle timeout future when receive future resolved, + // because ReceiveFuture drop will take care of cancelling the timeout future. + // + /// Demonstrates passing a struct future that is `Unpin` via a `PhantomData` field + /// to `receive_with_timeout`. + /// + /// A struct with only a `PhantomData` field has no self-referential state, so + /// the compiler derives `Unpin` automatically. + /// + /// ``` + /// use com_api_concept::{CommData, SampleContainer, Subscription}; + /// use com_api_runtime_lola::LolaRuntimeImpl; + /// use core::marker::PhantomData; + /// + /// struct UnpinTimeout(PhantomData); + /// + /// impl std::future::Future for UnpinTimeout { + /// type Output = (); + /// fn poll( + /// self: std::pin::Pin<&mut Self>, + /// _cx: &mut std::task::Context<'_>, + /// ) -> std::task::Poll<()> { + /// std::task::Poll::Ready(()) + /// } + /// } + /// + /// fn demonstrate<'a, T, S>(sub: &'a S, scratch: SampleContainer>) + /// where + /// T: CommData + std::fmt::Debug, + /// S: Subscription, + /// { + /// let _future = sub.receive_timeout(scratch, 1, 1, UnpinTimeout::(PhantomData)); + /// } + /// ``` #[allow(clippy::manual_async_fn)] - fn receive<'a>( + fn receive_timeout<'a>( &'a self, scratch: SampleContainer>, new_samples: usize, max_samples: usize, - ) -> impl Future>>> + 'a { + timeout: impl Future + Send + 'static, + ) -> impl Future>, Result)> + 'a { async move { - if max_samples > self.max_num_samples || new_samples > self.max_num_samples { - return Err(Error::ReceiveError( - ReceiveFailedReason::SampleCountOutOfBounds { - max: self.max_num_samples, - requested: max_samples.max(new_samples), - }, - )); + match new_samples { + 0 => { + return ( + scratch, + Err(Error::ReceiveError( + ReceiveFailedReason::InputValueOutOfBounds { + max: self.max_num_samples, + requested: 0, + }, + )), + ) + } + _ if new_samples > max_samples => { + return ( + scratch, + Err(Error::ReceiveError( + ReceiveFailedReason::InputValueOutOfBounds { + max: max_samples, + requested: new_samples, + }, + )), + ); + } + _ if max_samples > self.max_num_samples || new_samples > self.max_num_samples => { + return ( + scratch, + Err(Error::ReceiveError( + ReceiveFailedReason::InputValueOutOfBounds { + max: self.max_num_samples, + requested: max_samples.max(new_samples), + }, + )), + ); + } + _ => {} } // Get the event guard to ensure no concurrent receive calls // on the same subscriber instance. @@ -570,6 +634,7 @@ where new_samples, max_samples, total_received: 0, + timeout: core::pin::pin!(timeout), } .await } @@ -581,7 +646,9 @@ where // a waker storage for async notifications, and parameters for managing the receive operation. // The Future implementation for ReceiveFuture defines the polling logic, // which attempts to receive samples and manages the state of the receive operation. -struct ReceiveFuture<'a, T: CommData + Debug> { +// Unpin bound is required for the timeout future to allow it to be safely pinned on the stack without +// heap allocation. +struct ReceiveFuture<'a, T: CommData + Debug, F: Future + Unpin> { event_guard: Option>, waker_storage: Arc, max_num_samples: usize, @@ -589,10 +656,11 @@ struct ReceiveFuture<'a, T: CommData + Debug> { new_samples: usize, max_samples: usize, total_received: usize, + timeout: F, } -impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { - type Output = Result>>; +impl<'a, T: CommData + Debug, F: Future + Unpin> Future for ReceiveFuture<'a, T, F> { + type Output = (SampleContainer>, Result); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { // Extract all immutable values upfront to avoid borrow conflicts with self in the callback @@ -601,6 +669,17 @@ impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { let max_num_samples = self.max_num_samples; let total_received = self.total_received; + // Poll cancel/timeout future first — F: Unpin so Pin::new is safe, no allocation + if Pin::new(&mut self.timeout).poll(ctx).is_ready() { + self.event_guard = None; + return Poll::Ready(( + self.scratch + .take() + .expect("SampleContainer missing on timeout/cancel"), + Err(Error::ReceiveError(ReceiveFailedReason::Cancelled)), + )); + } + // Register the current waker to be notified when new samples arrive via FFI callback self.waker_storage.register(ctx.waker()); @@ -617,6 +696,7 @@ impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { self.scratch = Some(scratch); result } else { + self.scratch = Some(scratch); Err(Error::ReceiveError(ReceiveFailedReason::ReceiveError)) } } else { @@ -632,15 +712,25 @@ impl<'a, T: CommData + Debug> Future for ReceiveFuture<'a, T> { //event_guard will be dropped here, allowing new receive calls to access the // proxy event self.event_guard = None; - return Poll::Ready(Ok(self - .scratch - .take() - .expect("SampleContainer is not available when returning Future result"))); + return Poll::Ready(( + self.scratch.take().expect( + "SampleContainer is not available when returning Future result", + ), + Ok(self.total_received), + )); } // Have some samples but not enough yet, wait for more via waker Poll::Pending } - Err(e) => Poll::Ready(Err(e)), + Err(e) => { + self.event_guard = None; + Poll::Ready(( + self.scratch.take().expect( + "SampleContainer unavailable on error; was receive polled after completion?", + ), + Err(e), + )) + } } } } @@ -921,6 +1011,14 @@ fn try_receive_samples( max_num_samples: usize, max_samples: usize, ) -> Result { + if max_samples == 0 { + return Err(Error::ReceiveError( + ReceiveFailedReason::InputValueOutOfBounds { + max: max_num_samples, + requested: 0, + }, + )); + } if max_samples > max_num_samples { return Err(Error::ReceiveError( ReceiveFailedReason::SampleCountOutOfBounds { diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index d099256f3..d03ba43df 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -328,12 +328,13 @@ where } #[allow(clippy::manual_async_fn)] - fn receive<'a>( + fn receive_timeout<'a>( &'a self, _scratch: SampleContainer>, _new_samples: usize, _max_samples: usize, - ) -> impl Future>>> + 'a { + _timeout: impl Future + Send + 'static, + ) -> impl Future>, Result)> + 'a { async { todo!() } } } @@ -531,16 +532,10 @@ mod test { let test_subscriber = super::SubscriberImpl::::new(); // block on an asynchronous reception of data from test_subscriber futures::executor::block_on(async { - let mut sample_buf = SampleContainer::new(); - match test_subscriber.receive(&mut sample_buf, 1, 1).await { - Ok(0) => panic!("No sample received"), - Ok(x) => { - println!( - "{} samples received: sample[0] = {}", - x, - *sample_buf.front().unwrap() - ) - } + let sample_buf = SampleContainer::new(1); + let (_returned_buf, result) = test_subscriber.receive(sample_buf, 1, 1).await; + match result { + Ok(()) => {} Err(e) => panic!("{:?}", e), } }) diff --git a/score/mw/com/impl/rust/com-api/com-api/com_api.rs b/score/mw/com/impl/rust/com-api/com-api/com_api.rs index ba093a015..4be3e66f1 100644 --- a/score/mw/com/impl/rust/com-api/com-api/com_api.rs +++ b/score/mw/com/impl/rust/com-api/com-api/com_api.rs @@ -114,7 +114,8 @@ //! } //! //! // 6. Or wait asynchronously for events -//! let n = subscription.receive(&mut container, 1, 3).await?; +//! let (container, result) = subscription.receive(container, 1, 3).await; +//! result?; //! ``` //! # Further reading //! - `com_api_concept` crate — trait definitions and full API documentation diff --git a/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs b/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs index 309e926ca..d6b60b6ab 100644 --- a/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs +++ b/score/mw/com/test/basic_rust_api/consumer_async_apis/consumer_app.rs @@ -89,33 +89,31 @@ async fn async_main() { let mut sample_buf = SampleContainer::new(MAX_SAMPLES_PER_CALL); // `receive` awaits until at least `min_samples` (1) are available. while received_total < num_cycles { - sample_buf = match subscription + let (returned_buf, result) = subscription .receive(sample_buf, 1, MAX_SAMPLES_PER_CALL) - .await - { - Ok(returned_buf) => { - let count = returned_buf.sample_count(); - if count > 0 { - let mut buf = returned_buf; - for _ in 0..count { - if let Some(sample) = buf.pop_front() { - println!("[bigdata-consumer] Received sample x={}", sample.x); - } - } - received_total += count; - println!( - "[bigdata-consumer] Progress: {}/{}", - received_total, num_cycles - ); - buf - } else { - returned_buf + .await; + if let Err(e) = result { + eprintln!("[bigdata-consumer] Receive error: {:?}", e); + return; + } + // Process the received samples and prepare the buffer for the next call to `receive`. + // We are retuning the same buffer instance to `receive` to avoid unnecessary allocations. + // So emptying buffer means processing, and if user required it can pass with samples still in buffer + // and `receive` will add new samples to it until it reaches the max capacity of the buffer. + sample_buf = { + let count = returned_buf.sample_count(); + let mut buf = returned_buf; + for _ in 0..count { + if let Some(sample) = buf.pop_front() { + println!("[bigdata-consumer] Received sample x={}", sample.x); } } - Err(e) => { - eprintln!("[bigdata-consumer] Receive error: {:?}", e); - return; - } + received_total += count; + println!( + "[bigdata-consumer] Progress: {}/{}", + received_total, num_cycles + ); + buf }; }