diff --git a/rs/moq-native/tests/broadcast.rs b/rs/moq-native/tests/broadcast.rs index e4a001fd7..dcd47e96f 100644 --- a/rs/moq-native/tests/broadcast.rs +++ b/rs/moq-native/tests/broadcast.rs @@ -227,6 +227,104 @@ async fn broadcast_moq_lite_05_timestamps_webtransport() { lite05_timestamp_roundtrip("https").await; } +/// Lite05 Track Stream: the client resolves a track's immutable properties via +/// `info()` (a TRACK request answered with TRACK_INFO) without subscribing, then +/// subscribes and reads a frame. Exercises the on-demand info path end-to-end. +async fn lite05_info_roundtrip(scheme: &str) { + use moq_native::moq_net::Timescale; + + let pub_origin = Origin::random().produce(); + let mut broadcast = pub_origin.create_broadcast("test").expect("create broadcast"); + let mut track = broadcast + .create_track(Track::new("video").with_timescale(Timescale::MICRO)) + .expect("create track"); + + let mut group = track.append_group().expect("append group"); + let frame = moq_native::moq_net::Frame { + size: 5, + timestamp: Some(moq_native::moq_net::Timestamp::new(10_000, Timescale::MICRO).unwrap()), + }; + let mut writer = group.create_frame(frame).expect("create frame"); + writer.write(bytes::Bytes::from_static(b"hello")).expect("write frame"); + writer.finish().expect("finish frame"); + group.finish().expect("finish group"); + + let mut server_config = moq_native::ServerConfig::default(); + server_config.bind = Some("[::]:0".to_string()); + server_config.tls.generate = vec!["localhost".into()]; + server_config.version = vec!["moq-lite-05-wip".parse().unwrap()]; + let mut server = server_config.init().expect("init server"); + let addr = server.local_addr().expect("local addr"); + + let sub_origin = Origin::random().produce(); + let mut announcements = sub_origin.consume().announced(); + + let mut client_config = moq_native::ClientConfig::default(); + client_config.tls.disable_verify = Some(true); + client_config.version = vec!["moq-lite-05-wip".parse().unwrap()]; + let client = client_config.init().expect("init client"); + let url: url::Url = format!("{scheme}://localhost:{}", addr.port()).parse().unwrap(); + + let server_handle = tokio::spawn(async move { + let request = server.accept().await.expect("no incoming connection"); + let session = request.with_publisher(pub_origin.clone()).ok().await?; + let _broadcast = broadcast; + let _track = track; + let _ = session.closed().await; + Ok::<_, anyhow::Error>(()) + }); + + let client = client.with_consumer(sub_origin); + let session = tokio::time::timeout(TIMEOUT, client.connect(url)) + .await + .expect("client connect timed out") + .expect("client connect failed"); + + let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next()) + .await + .expect("announce timed out") + .expect("origin closed"); + assert_eq!(path.as_str(), "test"); + let bc = bc.broadcast().expect("expected announce, got unannounce"); + + // Resolve the track's immutable info without subscribing. + let info = tokio::time::timeout(TIMEOUT, bc.consume_track("video").info()) + .await + .expect("info timed out") + .expect("info failed"); + assert_eq!(info.timescale, Some(Timescale::MICRO)); + + // A subscribe still works (and reuses the now-cached info). + let mut track_sub = bc + .consume_track("video") + .subscribe(None) + .await + .expect("subscribe failed"); + let mut group_sub = tokio::time::timeout(TIMEOUT, track_sub.recv_group()) + .await + .expect("recv_group timed out") + .expect("recv_group failed") + .expect("track closed prematurely"); + let frame = tokio::time::timeout(TIMEOUT, group_sub.read_frame()) + .await + .expect("read_frame timed out") + .expect("read_frame failed") + .expect("group closed prematurely"); + assert_eq!(&*frame, b"hello"); + + drop(session); + server_handle + .await + .expect("server task panicked") + .expect("server task failed"); +} + +#[tracing_test::traced_test] +#[tokio::test] +async fn broadcast_moq_lite_05_info_webtransport() { + lite05_info_roundtrip("https").await; +} + /// On Lite05 a publisher that doesn't advertise a timescale still works: /// SUBSCRIBE_OK carries `timescale = 0` and neither side encodes a /// per-frame timestamp byte. Subscribers receive `frame.timestamp = None`. diff --git a/rs/moq-net/src/ietf/subscriber.rs b/rs/moq-net/src/ietf/subscriber.rs index d167c8ae2..d8fe277be 100644 --- a/rs/moq-net/src/ietf/subscriber.rs +++ b/rs/moq-net/src/ietf/subscriber.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::{ Broadcast, BroadcastDynamic, Error, Frame, FrameProducer, Group, GroupProducer, MAX_FRAME_SIZE, OriginProducer, - Path, PathOwned, StatsHandle, SubscriberStats, SubscriberTrack, Track, TrackProducer, TrackRequest, + Path, PathOwned, PendingTrack, StatsHandle, SubscriberStats, SubscriberTrack, Track, TrackProducer, coding::{Reader, Stream}, ietf::{self, Control, FilterType, GroupOrder, RequestId}, model::BroadcastProducer, @@ -536,7 +536,7 @@ impl Subscriber { Ok(()) } - async fn run_subscribe(&mut self, broadcast_path: Path<'_>, broadcast: BroadcastDynamic, request: TrackRequest) { + async fn run_subscribe(&mut self, broadcast_path: Path<'_>, broadcast: BroadcastDynamic, request: PendingTrack) { // Accept right away: IETF group data can arrive before SubscribeOk, so we // need the producer in place to route it. This also unblocks the // downstream subscriber's `consume_track`. diff --git a/rs/moq-net/src/lite/mod.rs b/rs/moq-net/src/lite/mod.rs index 4b1612594..4df1bb449 100644 --- a/rs/moq-net/src/lite/mod.rs +++ b/rs/moq-net/src/lite/mod.rs @@ -19,6 +19,7 @@ mod session; mod stream; mod subscribe; mod subscriber; +mod track; mod version; pub use announce::*; @@ -37,4 +38,6 @@ pub(super) use session::*; pub use stream::*; pub use subscribe::*; use subscriber::*; +#[allow(unused_imports)] +pub use track::*; pub use version::Version; diff --git a/rs/moq-net/src/lite/publisher.rs b/rs/moq-net/src/lite/publisher.rs index 9a020fa75..4a1c4a59f 100644 --- a/rs/moq-net/src/lite/publisher.rs +++ b/rs/moq-net/src/lite/publisher.rs @@ -80,6 +80,7 @@ impl Publisher { match kind { lite::ControlType::Announce => self.recv_announce(stream).await, lite::ControlType::Subscribe => self.recv_subscribe(stream).await, + lite::ControlType::Track => self.recv_track(stream).await, lite::ControlType::Probe => { self.recv_probe(stream).await; Ok(()) @@ -461,8 +462,9 @@ impl Publisher { .await?; // Compress only when the producer marked the track worth it and the - // negotiated draft understands the SUBSCRIBE_OK codec field. Older drafts - // (lite-04 and below) get None and the frames stream verbatim. + // negotiated draft understands the codec field (carried in SUBSCRIBE_OK on + // lite-04 and below, in TRACK_INFO on lite-05+). Older drafts without it + // get None and the frames stream verbatim. let supports_compression = !matches!( version, Version::Lite01 | Version::Lite02 | Version::Lite03 | Version::Lite04 @@ -487,20 +489,23 @@ impl Publisher { // broadcast. Dropping this guard (subscription end) releases it. let _broadcast_sub = broadcasts.subscribe(&absolute); - let info = lite::SubscribeOk { - priority: subscribe.priority, - ordered: false, - max_latency: std::time::Duration::ZERO, - start_group: None, - end_group: None, - compression, - timescale, - // Announce the publisher's cache window so the subscriber (a relay) - // re-serves with the same eviction window. Pre-lite-05 peers ignore it. - cache: track.cache, - }; + // Lite05+ accepts a subscription implicitly (rejection is a stream reset) + // and serves the immutable properties over a TRACK_INFO stream instead. + // Older drafts confirm acceptance with SUBSCRIBE_OK here. + if matches!( + version, + Version::Lite01 | Version::Lite02 | Version::Lite03 | Version::Lite04 + ) { + let info = lite::SubscribeOk { + priority: subscribe.priority, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: None, + end_group: None, + }; - stream.writer.encode(&lite::SubscribeResponse::Ok(info)).await?; + stream.writer.encode(&lite::SubscribeResponse::Ok(info)).await?; + } // Track-level subscriber priority. SUBSCRIBE_UPDATE messages broadcast new values // to both run_track (so future groups inherit the new priority) and serve_group @@ -538,6 +543,80 @@ impl Publisher { stream.writer.finish()?; stream.writer.closed().await } + + /// Serve a Track Stream: reply with the track's immutable [`lite::TrackInfo`] + /// and FIN, or reset on error (e.g. the track does not exist). Lite05+ only. + /// + /// Runs inline in its own control-stream task (see [`Self::handle`]); resolving + /// the info can be a cold upstream TRACK fetch, but that only blocks this task. + pub async fn recv_track(&self, mut stream: Stream) -> Result<(), Error> { + let req = stream.reader.decode::().await?; + + let track = req.track.to_string(); + let absolute = self.origin.absolute(&req.broadcast).to_owned(); + + tracing::debug!(broadcast = %absolute, %track, "track info requested"); + + let broadcast = self.origin.get_broadcast(&req.broadcast); + + if let Err(err) = Self::run_track_info(&mut stream, &track, broadcast, self.version).await { + match &err { + Error::Cancel | Error::Transport(_) => { + tracing::debug!(broadcast = %absolute, %track, "track info cancelled") + } + err => tracing::warn!(broadcast = %absolute, %track, %err, "track info error"), + } + stream.writer.abort(&err); + } + + Ok(()) + } + + async fn run_track_info( + stream: &mut Stream, + track_name: &str, + consumer: Option, + version: Version, + ) -> Result<(), Error> { + let broadcast = consumer.ok_or(Error::NotFound)?; + + // Resolve the immutable properties without subscribing. Warm (a producer + // exists or the info is cached) this returns with no round trip; cold (a + // relay with no prior subscription) it triggers a single upstream TRACK + // fetch via the model's info-request channel. + let track = broadcast.consume_track(track_name).info().await?; + + // Mirror the negotiation in `run_subscribe` so the subscriber decodes + // frames the same way it'll see them served. + let supports_compression = !matches!( + version, + Version::Lite01 | Version::Lite02 | Version::Lite03 | Version::Lite04 + ); + let compression = if track.compress && supports_compression { + Compression::Deflate + } else { + Compression::None + }; + let timescale = if version.has_timestamps() { + track.timescale + } else { + None + }; + + let info = lite::TrackInfo { + // The model carries no publisher-chosen priority/order yet, so both + // default to the tie-break-neutral values. + priority: 0, + ordered: false, + cache: track.cache, + timescale, + compression, + }; + + stream.writer.encode(&info).await?; + stream.writer.finish()?; + stream.writer.closed().await + } } /// Shared per-subscription state for the publisher side. Cloned (cheaply — every diff --git a/rs/moq-net/src/lite/stream.rs b/rs/moq-net/src/lite/stream.rs index d52a77943..fda6bbefb 100644 --- a/rs/moq-net/src/lite/stream.rs +++ b/rs/moq-net/src/lite/stream.rs @@ -13,6 +13,9 @@ pub enum ControlType { Fetch = 3, Probe = 4, Goaway = 5, + /// Track Stream: a subscriber requests a track's immutable publisher + /// properties (TRACK_INFO) without subscribing or fetching. Lite05+ only. + Track = 6, } impl Decode for ControlType { diff --git a/rs/moq-net/src/lite/subscribe.rs b/rs/moq-net/src/lite/subscribe.rs index 8f17a003a..fb0901a38 100644 --- a/rs/moq-net/src/lite/subscribe.rs +++ b/rs/moq-net/src/lite/subscribe.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use crate::{ - Compression, Path, Timescale, + Path, coding::{Decode, DecodeError, Encode, EncodeError, Sizer}, }; @@ -72,6 +72,11 @@ impl Message for Subscribe<'_> { } } +/// Sent by the publisher to accept a subscription (Lite01-04 only). +/// +/// On Lite05+ a subscription is accepted implicitly (rejection is a stream +/// reset) and the immutable publisher properties moved to [`TrackInfo`], fetched +/// once over a [Track Stream](super::Track). This message is no longer sent. #[derive(Clone, Debug)] pub struct SubscribeOk { pub priority: u8, @@ -79,21 +84,6 @@ pub struct SubscribeOk { pub max_latency: std::time::Duration, pub start_group: Option, pub end_group: Option, - /// Codec the publisher will use for every frame on this track. Negotiated - /// here (not in SUBSCRIBE) so the subscriber blocks on this message before it - /// can decode any frame payload. Lite05+ only; older drafts always get - /// [`Compression::None`]. - pub compression: Compression, - /// Per-frame timestamp scale advertised by the publisher. `None` means the - /// publisher doesn't carry per-frame timestamps on the wire (so frame - /// headers omit them). Lite05+ only; older drafts always decode as `None`. - /// On the wire `None` is `0` and `Some(n)` is `n`. - pub timescale: Option, - /// How long the publisher keeps old groups available before evicting them. - /// A relay re-serves with the same window and clamps each subscriber's stale - /// preference to it. Lite05+ only; older drafts always get - /// [`crate::DEFAULT_CACHE`]. - pub cache: std::time::Duration, } impl Message for SubscribeOk { @@ -103,23 +93,14 @@ impl Message for SubscribeOk { self.priority.encode(w, version)?; } Version::Lite02 => {} - Version::Lite03 | Version::Lite04 => { - self.priority.encode(w, version)?; - (self.ordered as u8).encode(w, version)?; - self.max_latency.encode(w, version)?; - self.start_group.encode(w, version)?; - self.end_group.encode(w, version)?; - } + // Lite05+ no longer sends SUBSCRIBE_OK, but keep the Lite03/04 layout as + // the forward default so an accidental encode stays well-formed. _ => { self.priority.encode(w, version)?; (self.ordered as u8).encode(w, version)?; self.max_latency.encode(w, version)?; self.start_group.encode(w, version)?; self.end_group.encode(w, version)?; - // Order matches draft-lcurley-moq-lite-05 SUBSCRIBE_OK: Timescale, Cache, Compression. - self.timescale.map(u64::from).unwrap_or(0).encode(w, version)?; - self.cache.encode(w, version)?; - self.compression.to_code().encode(w, version)?; } } @@ -134,9 +115,6 @@ impl Message for SubscribeOk { max_latency: std::time::Duration::ZERO, start_group: None, end_group: None, - compression: Compression::None, - timescale: None, - cache: crate::DEFAULT_CACHE, }), Version::Lite02 => Ok(Self { priority: 0, @@ -144,51 +122,14 @@ impl Message for SubscribeOk { max_latency: std::time::Duration::ZERO, start_group: None, end_group: None, - compression: Compression::None, - timescale: None, - cache: crate::DEFAULT_CACHE, }), - Version::Lite03 | Version::Lite04 => { - let priority = u8::decode(r, version)?; - let ordered = u8::decode(r, version)? != 0; - let max_latency = std::time::Duration::decode(r, version)?; - let start_group = Option::::decode(r, version)?; - let end_group = Option::::decode(r, version)?; - - Ok(Self { - priority, - ordered, - max_latency, - start_group, - end_group, - compression: Compression::None, - timescale: None, - cache: crate::DEFAULT_CACHE, - }) - } - _ => { - let priority = u8::decode(r, version)?; - let ordered = u8::decode(r, version)? != 0; - let max_latency = std::time::Duration::decode(r, version)?; - let start_group = Option::::decode(r, version)?; - let end_group = Option::::decode(r, version)?; - // Order matches draft-lcurley-moq-lite-05 SUBSCRIBE_OK: Timescale, Cache, Compression. - let timescale = Timescale::new(u64::decode(r, version)?).ok(); - let cache = std::time::Duration::decode(r, version)?; - let compression = - Compression::from_code(u64::decode(r, version)?).map_err(|_| DecodeError::InvalidValue)?; - - Ok(Self { - priority, - ordered, - max_latency, - start_group, - end_group, - compression, - timescale, - cache, - }) - } + _ => Ok(Self { + priority: u8::decode(r, version)?, + ordered: u8::decode(r, version)? != 0, + max_latency: std::time::Duration::decode(r, version)?, + start_group: Option::::decode(r, version)?, + end_group: Option::::decode(r, version)?, + }), } } } @@ -398,9 +339,6 @@ mod test { max_latency: Duration::from_millis(250), start_group: Some(3), end_group: None, - compression: Compression::Deflate, - timescale: Some(Timescale::MICRO), - cache: Duration::from_secs(10), } } @@ -412,60 +350,12 @@ mod test { } #[test] - fn compression_roundtrips_on_lite05() { - let got = roundtrip(Version::Lite05Wip, &sample()); - assert_eq!(got.compression, Compression::Deflate); + fn fields_roundtrip_on_lite04() { + let got = roundtrip(Version::Lite04, &sample()); assert_eq!(got.priority, 7); assert!(got.ordered); + assert_eq!(got.max_latency, Duration::from_millis(250)); assert_eq!(got.start_group, Some(3)); assert_eq!(got.end_group, None); } - - #[test] - fn compression_absent_before_lite05() { - let ok = sample(); - - // The compression varint only exists on lite-05+, so the older encoding is - // strictly shorter and always decodes back as None. - let mut buf04 = Vec::new(); - ok.encode_msg(&mut buf04, Version::Lite04).unwrap(); - let mut buf05 = Vec::new(); - ok.encode_msg(&mut buf05, Version::Lite05Wip).unwrap(); - assert!( - buf05.len() > buf04.len(), - "lite-05 carries extra compression + timescale varints" - ); - - assert_eq!(roundtrip(Version::Lite04, &ok).compression, Compression::None); - } - - #[test] - fn timescale_roundtrips_on_lite05() { - let got = roundtrip(Version::Lite05Wip, &sample()); - assert_eq!(got.timescale, Some(Timescale::MICRO)); - } - - #[test] - fn timescale_absent_before_lite05() { - // Lite04 doesn't carry the timescale varint, so it always decodes as None. - assert_eq!(roundtrip(Version::Lite04, &sample()).timescale, None); - } - - #[test] - fn timescale_zero_on_wire_decodes_as_none() { - let mut ok = sample(); - ok.timescale = None; - assert_eq!(roundtrip(Version::Lite05Wip, &ok).timescale, None); - } - - #[test] - fn cache_roundtrips_on_lite05() { - assert_eq!(roundtrip(Version::Lite05Wip, &sample()).cache, Duration::from_secs(10)); - } - - #[test] - fn cache_absent_before_lite05() { - // Lite04 doesn't carry the cache varint, so it always decodes as the default. - assert_eq!(roundtrip(Version::Lite04, &sample()).cache, crate::DEFAULT_CACHE); - } } diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index 3e5ee4e6d..d3f4d1e8b 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -9,8 +9,8 @@ use futures::{StreamExt, stream::FuturesUnordered}; use crate::{ AsPath, BandwidthProducer, Broadcast, BroadcastDynamic, Compression, Error, Frame, FrameProducer, Group, - GroupProducer, MAX_FRAME_SIZE, OriginProducer, Path, PathOwned, StatsHandle, SubscriberStats, SubscriberTrack, - Timescale, Timestamp, Track, TrackProducer, TrackRequest, + GroupProducer, MAX_FRAME_SIZE, OriginProducer, Path, PathOwned, PendingTrack, StatsHandle, SubscriberStats, + SubscriberTrack, Subscription, Timescale, Timestamp, Track, TrackProducer, coding::{Reader, Stream}, lite, model::BroadcastProducer, @@ -61,12 +61,23 @@ pub(super) struct Subscriber { #[derive(Clone)] struct TrackEntry { - producer: TrackProducer, stats: Arc, - /// The SUBSCRIBE_OK for this subscription. `None` until it arrives; group - /// streams block on it before decoding any frame, since a group can race - /// ahead of SUBSCRIBE_OK on its own QUIC stream. - subscribe_ok: kio::Consumer>, + /// Resolves once the upstream subscription is accepted: after TRACK_INFO on + /// lite-05, after SUBSCRIBE_OK on older drafts. Group streams park on this so a + /// group that races ahead of acceptance buffers (in QUIC flow control) instead + /// of being dropped. `None` until resolved; a closed channel means the + /// subscription ended first, which group streams treat as cancelled. + resolved: kio::Consumer>, +} + +/// The decoded-once-per-track state a group stream needs: where to write groups +/// and how to parse their frames. Populated from TRACK_INFO (lite-05) so a single +/// lookup is reused across every group instead of re-derived per response. +#[derive(Clone)] +struct ResolvedTrack { + producer: TrackProducer, + compression: Compression, + timescale: Option, } /// Result of an upstream subscribe lifecycle. @@ -452,10 +463,9 @@ impl Subscriber { } async fn run_broadcast(self, path: PathOwned, mut broadcast: BroadcastDynamic) { - // Actually start serving subscriptions. + // Serve track requests (subscribe and info-only, coalesced) until the + // broadcast is gone. A request with no subscriber is an info-only lookup. loop { - // Keep serving requests until there are no more consumers. - // This way we'll clean up the task when the broadcast is no longer needed. let request = tokio::select! { request = broadcast.requested_track() => match request { Ok(request) => request, @@ -476,16 +486,52 @@ impl Subscriber { } } - /// Drive one upstream subscription end-to-end, including linger across consumer churn. + /// Forward the aggregate downstream preferences upstream as a SUBSCRIBE. + async fn open_subscribe( + &self, + id: u64, + name: &str, + path: &PathOwned, + sub: &Subscription, + ) -> Result, Error> { + let mut stream = Stream::open(&self.session, self.version).await?; + stream.writer.encode(&lite::ControlType::Subscribe).await?; + let msg = lite::Subscribe { + id, + broadcast: path.as_path(), + track: name.into(), + priority: sub.priority, + ordered: sub.ordered, + max_latency: sub.stale, + start_group: sub.group_start, + end_group: sub.group_end, + }; + stream.writer.encode(&msg).await?; + Ok(stream) + } + + /// Resolve a track's immutable props on lite-05 via a single upstream TRACK fetch. /// - /// On linger entry (last consumer drops) we send `SubscribeUpdate(priority=0, - /// end_group=Some(latest))`. The publisher treats `end_group` as a serving cap, - /// not a terminator: it holds any groups beyond the cap and resumes when we - /// raise it. On resume (a new consumer arrives) we send `SubscribeUpdate(end_group=None)` - /// to uncap. The stream stays open across the whole lifecycle — only a timeout - /// or a publisher-side close ends it. This avoids the stream-churn / duplicate-fetch - /// race that an unsubscribe-and-reissue approach would have. - async fn run_subscribe(&mut self, path: PathOwned, broadcast: BroadcastDynamic, request: TrackRequest) { + /// A repeat subscribe reuses the live producer in the model (so it never reaches + /// here), and the resolved props land back in the model's `tracks` once `accept` + /// runs, so a later `info()` is warm. + async fn resolve_props(&self, path: &PathOwned, name: &str) -> Result { + let info = self.fetch_track_info(&path.as_path(), name).await?; + let mut props = Track::new(name); + props.timescale = info.timescale; + props.cache = info.cache; + // The model carries compression as a bool; the codec set is {none, deflate}, + // so the flag round-trips losslessly. + props.compress = info.compression != Compression::None; + Ok(props) + } + + /// Serve one track request: resolve its immutable info, and — if there's group + /// demand — drive the upstream subscription with linger across consumer churn. + /// + /// A request with no subscriber (`info()` only) just resolves and caches the + /// info; no upstream SUBSCRIBE is opened. When demand appears, we open it then. + async fn run_subscribe(&mut self, path: PathOwned, broadcast: BroadcastDynamic, request: PendingTrack) { // Subscriber-side track stats; counters bump as frames/bytes/groups arrive. // Drop on subscription end records `subscriber.subscriptions_closed`. We use // subscriber_track to avoid double-counting broadcasts: the broadcast lifetime @@ -493,29 +539,13 @@ impl Subscriber { let name = request.name().to_string(); let abs = self.origin.absolute(&path); let track_stats = Arc::new(self.stats.broadcast(&abs).subscriber_track(&name)); - // The per-(session, broadcast) `broadcasts` sentinel is taken later, once - // the upstream confirms with SUBSCRIBE_OK (see `run_subscribe_session`), so a - // sub cancelled before then isn't counted as a feeding session. let id = self.next_id.fetch_add(1, atomic::Ordering::Relaxed); - // Forward the aggregate of every downstream subscriber's preferences upstream. - let subscription = request.subscription().clone(); - let msg = lite::Subscribe { - id, - broadcast: path.as_path(), - track: (&name).into(), - priority: subscription.priority, - ordered: subscription.ordered, - max_latency: subscription.stale, - start_group: subscription.group_start, - end_group: subscription.group_end, - }; - - tracing::info!(id, broadcast = %self.log_path(&path), track = %name, "subscribe started"); + tracing::info!(id, broadcast = %self.log_path(&path), track = %name, "track requested"); let result = self - .run_subscribe_session(id, &name, request, track_stats, &broadcast, msg) + .run_subscribe_session(id, &name, request, track_stats, &broadcast, &path) .await; self.subscribes.lock().remove(&id); @@ -536,111 +566,215 @@ impl Subscriber { } } - /// Open the upstream subscribe stream, wait for SUBSCRIBE_OK, then accept the - /// pending request (unblocking the downstream subscriber) and run the linger - /// lifecycle. The producer is created only after SUBSCRIBE_OK, so a downstream - /// a downstream `subscribe` resolves exactly when the upstream confirms. + /// Resolve the track's immutable props, then — if there's group demand — accept + /// the producer and drive the upstream subscription with linger. + /// + /// On lite-05 the props come from the cache or a TRACK_INFO stream flighted + /// alongside SUBSCRIBE (so the first group still arrives in one round trip); + /// older drafts read them (absent) from SUBSCRIBE_OK. A pure `info()` request + /// (no subscriber) resolves the props and caches them without subscribing. async fn run_subscribe_session( &self, id: u64, name: &str, - request: TrackRequest, + request: PendingTrack, track_stats: Arc, broadcast: &BroadcastDynamic, - msg: lite::Subscribe<'_>, + path: &PathOwned, ) -> SessionOutcome { - // Stash the original parameters so SubscribeUpdate messages can echo them - // while only varying the linger-related fields (priority, end_group). - let original_priority = msg.priority; - let ordered = msg.ordered; - let max_latency = msg.max_latency; - let start_group = msg.start_group; - - // SubscribeUpdate only exists on Lite03+; older versions take the - // immediate-FIN path with no linger. - let supports_linger = !matches!(self.version, Version::Lite01 | Version::Lite02); - - let mut stream = match Stream::open(&self.session, self.version).await { - Ok(s) => s, - Err(err) => { - request.deny(err.clone()); - return SessionOutcome::Error(err); - } - }; + // Pending entry up front so a group stream that races ahead of acceptance + // parks on `resolved` instead of being dropped. Held for the session's + // lifetime; dropping it closes the channel and wakes parked group streams. + let resolved_tx: kio::Producer> = kio::Producer::new(None); + self.subscribes.lock().insert( + id, + TrackEntry { + stats: track_stats, + resolved: resolved_tx.consume(), + }, + ); - if let Err(err) = stream.writer.encode(&lite::ControlType::Subscribe).await { - request.deny(err.clone()); - return SessionOutcome::Error(err); - } + // Group demand at hand-out? Always true before lite-05 (which has no info() + // callers); on lite-05 a pure TRACK request has none. + let initial_sub = request.subscription(); + let lite05 = matches!(self.version, Version::Lite05Wip); - if let Err(err) = stream.writer.encode(&msg).await { - stream.writer.abort(&err); - request.deny(err.clone()); - return SessionOutcome::Error(err); - } + // The upstream subscribe stream, opened only when there's group demand. + let mut stream: Option> = None; - // The first response MUST be a SUBSCRIBE_OK. Bail if the broadcast dies first. - let resp = tokio::select! { - err = broadcast.closed() => { - request.deny(err.clone()); - return SessionOutcome::BroadcastClosed(err); + // Resolve the immutable props; flight SUBSCRIBE in parallel when wanted. + let (compression, timescale, cache) = if lite05 { + // Flight SUBSCRIBE now if there's demand, so it races the info fetch (1 RTT). + if let Some(sub) = &initial_sub { + match self.open_subscribe(id, name, path, sub).await { + Ok(s) => stream = Some(s), + Err(err) => { + request.deny(err.clone()); + return SessionOutcome::Error(err); + } + } } - resp = stream.reader.decode::() => match resp { - Ok(r) => r, + + let props = tokio::select! { + err = broadcast.closed() => { + request.deny(err.clone()); + return SessionOutcome::BroadcastClosed(err); + } + res = self.resolve_props(path, name) => match res { + Ok(props) => props, + Err(err) => { + if let Some(s) = &mut stream { + s.writer.abort(&err); + } + request.deny(err.clone()); + return SessionOutcome::Error(err); + } + } + }; + let compression = if props.compress { + Compression::Deflate + } else { + Compression::None + }; + (compression, props.timescale, props.cache) + } else { + // Older drafts: open the subscribe stream and read SUBSCRIBE_OK (no props). + let sub = initial_sub.clone().unwrap_or_default(); + let mut s = match self.open_subscribe(id, name, path, &sub).await { + Ok(s) => s, Err(err) => { - stream.writer.abort(&err); request.deny(err.clone()); return SessionOutcome::Error(err); } + }; + let resp = tokio::select! { + err = broadcast.closed() => { + request.deny(err.clone()); + return SessionOutcome::BroadcastClosed(err); + } + resp = s.reader.decode::() => match resp { + Ok(r) => r, + Err(err) => { + s.writer.abort(&err); + request.deny(err.clone()); + return SessionOutcome::Error(err); + } + } + }; + if !matches!(resp, lite::SubscribeResponse::Ok(_)) { + let err = Error::ProtocolViolation; + s.writer.abort(&err); + request.deny(err.clone()); + return SessionOutcome::Error(err); } - }; - let lite::SubscribeResponse::Ok(info) = resp else { - let err = Error::ProtocolViolation; - stream.writer.abort(&err); - request.deny(err.clone()); - return SessionOutcome::Error(err); + stream = Some(s); + (Compression::None, None, crate::DEFAULT_CACHE) }; - // Upstream confirmed the subscription, so this session is now actively - // feeding the broadcast: take the per-(session, broadcast) sentinel. It - // drops when this fn returns (subscription end / cancel), releasing - // `broadcasts_closed`. Taken only after SUBSCRIBE_OK so a sub cancelled - // before confirmation isn't counted as a feeding session. - let abs = self.origin.absolute(&msg.broadcast); + // Accept: create the producer, resolving info + subscriber waiters. Stamp + // the negotiated timescale and cache window onto the local Track so groups + // inherit the timescale (validated at the model layer) and the producer + // evicts (and clamps downstream stale windows) with the same bound. + let abs = self.origin.absolute(path); let _broadcast_sub = self.broadcasts.subscribe(&abs); - // The publisher accepted: create the producer (unblocking the downstream - // subscriber) and start routing incoming groups to it. SUBSCRIBE_OK is known - // now, so the group streams never have to wait; they still read it through a - // kio channel (a group's QUIC stream can otherwise race ahead of SUBSCRIBE_OK). - // - // Stamp the negotiated timescale onto the local Track so groups inherit - // it and downstream consumers (including this subscriber's frame decode - // path) can validate per-frame timestamps at the model layer. let mut local_info = Track::new(name); - local_info.timescale = info.timescale; - // Carry the publisher's cache window so the local producer evicts (and - // clamps downstream stale windows) with the same bound when re-served. - local_info.cache = info.cache; + local_info.timescale = timescale; + local_info.cache = cache; let mut track = match request.accept(local_info) { Ok(track) => track, Err(err) => { - stream.writer.abort(&err); + if let Some(s) = &mut stream { + s.writer.abort(&err); + } return SessionOutcome::Error(err); } }; - let subscribe_ok = kio::Producer::new(Some(info)).consume(); - self.subscribes.lock().insert( - id, - TrackEntry { + + // Resolve the pending entry: parked group streams can now create groups + // (with the right timescale) and decode frames. + if let Ok(mut resolved) = resolved_tx.write() { + *resolved = Some(ResolvedTrack { producer: track.clone(), - stats: track_stats, - subscribe_ok, - }, - ); + compression, + timescale, + }); + } + + // If we didn't open the stream eagerly (info-only at hand-out), a subscriber + // may have coalesced during the info fetch: open it now (props are cached, so + // just SUBSCRIBE). Otherwise wait briefly for one, else drop (info stays cached). + if stream.is_none() { + loop { + if let Some(sub) = track.subscription() { + match self.open_subscribe(id, name, path, &sub).await { + Ok(s) => { + stream = Some(s); + break; + } + Err(err) => { + let _ = track.abort(err.clone()); + return SessionOutcome::Error(err); + } + } + } else { + tokio::select! { + _ = track.used() => continue, + err = broadcast.closed() => { + let _ = track.abort(err.clone()); + return SessionOutcome::BroadcastClosed(err); + } + _ = tokio::time::sleep(LINGER_TIMEOUT) => { + let _ = track.finish(); + return SessionOutcome::Cancelled; + } + } + } + } + } - // Lifecycle loop: serve → linger → resume → serve → ... → FIN. - let outcome = 'lifecycle: loop { + let mut stream = stream.expect("subscribe stream is open once there's group demand"); + let sub = track.subscription().or(initial_sub).unwrap_or_default(); + let outcome = self.serve_lifecycle(&mut stream, &mut track, broadcast, &sub).await; + + // Apply the outcome to the producer that downstream consumers read from. + match &outcome { + SessionOutcome::Complete => { + let _ = track.finish(); + } + SessionOutcome::Cancelled => { + let _ = track.abort(Error::Cancel); + } + SessionOutcome::BroadcastClosed(err) | SessionOutcome::Error(err) => { + let _ = track.abort(err.clone()); + } + } + + outcome + } + + /// The linger lifecycle on an open subscribe stream: serve → linger → resume → + /// ... → FIN. + /// + /// On linger entry (last consumer drops) we send `SubscribeUpdate(priority=0, + /// end_group=Some(latest))`. The publisher treats `end_group` as a serving cap, + /// not a terminator: it holds any groups beyond the cap and resumes when we + /// raise it. On resume (a new consumer arrives) we uncap with `end_group=None`. + /// The stream stays open across the whole lifecycle — only a timeout or a + /// publisher-side close ends it, avoiding the stream-churn an unsubscribe-and- + /// reissue approach would have. + async fn serve_lifecycle( + &self, + stream: &mut Stream, + track: &mut TrackProducer, + broadcast: &BroadcastDynamic, + sub: &Subscription, + ) -> SessionOutcome { + // SubscribeUpdate only exists on Lite03+; older versions take the + // immediate-FIN path with no linger. + let supports_linger = !matches!(self.version, Version::Lite01 | Version::Lite02); + + 'lifecycle: loop { // Phase 1 — serving. Wait for the last consumer to drop (enter linger), // the broadcast to die, or the upstream to close the stream. tokio::select! { @@ -655,22 +789,21 @@ impl Subscriber { }, } - // No linger on Lite01/02: FIN and report cancellation. if !supports_linger { let _ = stream.writer.finish(); break 'lifecycle SessionOutcome::Cancelled; } // Phase 2 — linger. Cap the publisher's serving cursor at the latest - // group we've cached and drop priority to 0; the publisher holds any - // group beyond the cap until we resume or FIN. `unwrap_or(0)` handles - // the corner case where we subscribed but haven't received a group yet. + // cached group and drop priority to 0; the publisher holds any group + // beyond the cap until we resume or FIN. `unwrap_or(0)` handles the case + // where we subscribed but haven't received a group yet. let cap = track.latest().unwrap_or(0); let pause = lite::SubscribeUpdate { priority: 0, - ordered, - max_latency, - start_group, + ordered: sub.ordered, + max_latency: sub.stale, + start_group: sub.group_start, end_group: Some(cap), }; if let Err(err) = stream.writer.encode(&pause).await { @@ -700,10 +833,10 @@ impl Subscriber { tracing::info!(track = %track.name, "subscribe resumed"); let uncap = lite::SubscribeUpdate { - priority: original_priority, - ordered, - max_latency, - start_group, + priority: sub.priority, + ordered: sub.ordered, + max_latency: sub.stale, + start_group: sub.group_start, end_group: None, }; if let Err(err) = stream.writer.encode(&uncap).await { @@ -711,58 +844,50 @@ impl Subscriber { break 'lifecycle SessionOutcome::Error(err); } // Loop back to Phase 1. - }; - - // Apply the outcome to the producer that downstream consumers read from. - match &outcome { - SessionOutcome::Complete => { - let _ = track.finish(); - } - SessionOutcome::Cancelled => { - let _ = track.abort(Error::Cancel); - } - SessionOutcome::BroadcastClosed(err) | SessionOutcome::Error(err) => { - let _ = track.abort(err.clone()); - } } + } - outcome + /// Open a Track Stream, send TRACK, and read the single TRACK_INFO reply. + /// + /// The publisher FINs after TRACK_INFO (or resets on error, e.g. the track + /// does not exist); we drop the stream once the reply is in. Lite05+ only. + async fn fetch_track_info(&self, broadcast: &Path<'_>, name: &str) -> Result { + let mut stream = Stream::open(&self.session, self.version).await?; + stream.writer.encode(&lite::ControlType::Track).await?; + let req = lite::Track { + broadcast: broadcast.clone(), + track: name.into(), + }; + stream.writer.encode(&req).await?; + + let info = stream.reader.decode::().await?; + let _ = stream.writer.finish(); + Ok(info) } pub async fn recv_group(&mut self, stream: &mut Reader) -> Result<(), Error> { let hdr: lite::Group = stream.decode().await?; - let (mut group, track, track_stats, subscribe_ok) = { - let mut subs = self.subscribes.lock(); - let entry = subs.get_mut(&hdr.subscribe).ok_or(Error::Cancel)?; - - let group_info = Group { sequence: hdr.sequence }; - let group = entry.producer.create_group(group_info)?; - ( - group, - entry.producer.clone(), - entry.stats.clone(), - entry.subscribe_ok.clone(), - ) + let (resolved, track_stats) = { + let subs = self.subscribes.lock(); + let entry = subs.get(&hdr.subscribe).ok_or(Error::Cancel)?; + (entry.resolved.clone(), entry.stats.clone()) }; - // Bump groups counter for this incoming group on the subscriber side. - track_stats.group(); - - // Block until SUBSCRIBE_OK arrives. The group's QUIC stream can arrive - // before SUBSCRIBE_OK lands on the subscribe stream, so we can't decode - // frames until this resolves. A closed channel means the subscription - // ended before SUBSCRIBE_OK, so treat it as cancelled. + // Block until the upstream is accepted and TRACK_INFO is known. The group's + // QUIC stream can arrive before that resolves; its unread bytes stay + // buffered by QUIC flow control until we create the group below. A closed + // channel means the subscription ended first, so treat it as cancelled. // // Map the closed `Ref` to `None` inside the poll closure (rather than using // `Consumer::wait`) so the `!Send` guard never enters this spawned future. - let (compression, timescale) = kio::wait(|waiter| { - let poll = subscribe_ok.poll(waiter, |ok| match &**ok { - Some(ok) => Poll::Ready((ok.compression, ok.timescale)), + let resolved = kio::wait(|waiter| { + let poll = resolved.poll(waiter, |r| match &**r { + Some(r) => Poll::Ready(r.clone()), None => Poll::Pending, }); match poll { - Poll::Ready(Ok(pair)) => Poll::Ready(Some(pair)), + Poll::Ready(Ok(r)) => Poll::Ready(Some(r)), Poll::Ready(Err(_closed)) => Poll::Ready(None), Poll::Pending => Poll::Pending, } @@ -770,10 +895,18 @@ impl Subscriber { .await .ok_or(Error::Cancel)?; + // Create the group now that the timescale is known, so it inherits the right + // per-frame timestamp scale. + let mut producer = resolved.producer.clone(); + let mut group = producer.create_group(Group { sequence: hdr.sequence })?; + + // Bump groups counter for this incoming group on the subscriber side. + track_stats.group(); + let res = tokio::select! { - err = track.closed() => Err(err), + err = producer.closed() => Err(err), err = group.closed() => Err(err), - res = self.run_group(stream, group.clone(), track_stats.clone(), compression, timescale) => res, + res = self.run_group(stream, group.clone(), track_stats.clone(), resolved.compression, resolved.timescale) => res, }; match res { diff --git a/rs/moq-net/src/lite/track.rs b/rs/moq-net/src/lite/track.rs new file mode 100644 index 000000000..26834459d --- /dev/null +++ b/rs/moq-net/src/lite/track.rs @@ -0,0 +1,183 @@ +use std::borrow::Cow; + +use crate::{ + Compression, Path, Timescale, + coding::{Decode, DecodeError, Encode, EncodeError}, +}; + +use super::{Message, Version}; + +/// Sent by the subscriber to open a Track Stream (0x6), requesting a track's +/// immutable publisher properties without subscribing or fetching. +/// +/// The publisher replies with a single [`TrackInfo`] and then FINs the stream, +/// or resets it on error (e.g. the track does not exist). Lite05+ only. +#[derive(Clone, Debug)] +pub struct Track<'a> { + pub broadcast: Path<'a>, + pub track: Cow<'a, str>, +} + +impl Message for Track<'_> { + fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Lite01 | Version::Lite02 | Version::Lite03 | Version::Lite04 => { + return Err(DecodeError::Version); + } + _ => {} + } + + let broadcast = Path::decode(r, version)?; + let track = Cow::::decode(r, version)?; + + Ok(Self { broadcast, track }) + } + + fn encode_msg(&self, w: &mut W, version: Version) -> Result<(), EncodeError> { + match version { + Version::Lite01 | Version::Lite02 | Version::Lite03 | Version::Lite04 => { + return Err(EncodeError::Version); + } + _ => {} + } + + self.broadcast.encode(w, version)?; + self.track.encode(w, version)?; + + Ok(()) + } +} + +/// Sent by the publisher in response to a [`Track`] request, carrying the track's +/// immutable publisher properties. It is the sole message on the Track Stream; the +/// publisher FINs immediately afterward, or resets the stream on error. +/// +/// Every field is fixed for the lifetime of the track. Fetched once and cached by +/// the subscriber, so the properties are no longer echoed on every SUBSCRIBE/FETCH +/// response. Lite05+ only. +#[derive(Clone, Debug)] +pub struct TrackInfo { + /// The publisher's priority for this track, used only to resolve ties between + /// subscriptions of equal subscriber priority. + pub priority: u8, + /// The publisher's group ordering preference, used only to resolve ties. + pub ordered: bool, + /// How long the publisher keeps old groups available before evicting them. A + /// relay re-serves with the same window and clamps each subscriber's stale + /// preference to it. + pub cache: std::time::Duration, + /// Per-frame timestamp scale. `None` (wire `0`) means the publisher doesn't + /// carry per-frame timestamps, so frame headers omit them. + pub timescale: Option, + /// Codec applied to every frame payload on this track. The subscriber needs + /// this (and `timescale`) before it can decode any frame. + pub compression: Compression, +} + +impl Message for TrackInfo { + fn encode_msg(&self, w: &mut W, version: Version) -> Result<(), EncodeError> { + match version { + Version::Lite01 | Version::Lite02 | Version::Lite03 | Version::Lite04 => { + return Err(EncodeError::Version); + } + _ => {} + } + + // Order matches draft-lcurley-moq-lite-05 TRACK_INFO: Priority, Ordered, + // Cache, Timescale, Compression. + self.priority.encode(w, version)?; + (self.ordered as u8).encode(w, version)?; + self.cache.encode(w, version)?; + self.timescale.map(u64::from).unwrap_or(0).encode(w, version)?; + self.compression.to_code().encode(w, version)?; + + Ok(()) + } + + fn decode_msg(r: &mut R, version: Version) -> Result { + match version { + Version::Lite01 | Version::Lite02 | Version::Lite03 | Version::Lite04 => { + return Err(DecodeError::Version); + } + _ => {} + } + + let priority = u8::decode(r, version)?; + let ordered = u8::decode(r, version)? != 0; + let cache = std::time::Duration::decode(r, version)?; + let timescale = Timescale::new(u64::decode(r, version)?).ok(); + let compression = Compression::from_code(u64::decode(r, version)?).map_err(|_| DecodeError::InvalidValue)?; + + Ok(Self { + priority, + ordered, + cache, + timescale, + compression, + }) + } +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use super::*; + + fn sample() -> TrackInfo { + TrackInfo { + priority: 7, + ordered: true, + cache: Duration::from_secs(10), + timescale: Some(Timescale::MICRO), + compression: Compression::Deflate, + } + } + + fn roundtrip(info: &TrackInfo) -> TrackInfo { + let mut buf = Vec::new(); + info.encode_msg(&mut buf, Version::Lite05Wip).unwrap(); + let mut slice = buf.as_slice(); + TrackInfo::decode_msg(&mut slice, Version::Lite05Wip).unwrap() + } + + #[test] + fn track_info_roundtrips() { + let got = roundtrip(&sample()); + assert_eq!(got.priority, 7); + assert!(got.ordered); + assert_eq!(got.cache, Duration::from_secs(10)); + assert_eq!(got.timescale, Some(Timescale::MICRO)); + assert_eq!(got.compression, Compression::Deflate); + } + + #[test] + fn timescale_zero_decodes_as_none() { + let mut info = sample(); + info.timescale = None; + assert_eq!(roundtrip(&info).timescale, None); + } + + #[test] + fn rejected_before_lite05() { + let mut buf = Vec::new(); + assert!(matches!( + sample().encode_msg(&mut buf, Version::Lite04), + Err(EncodeError::Version) + )); + } + + #[test] + fn track_request_roundtrips() { + let req = Track { + broadcast: Path::new("room/1"), + track: Cow::Borrowed("video"), + }; + let mut buf = Vec::new(); + req.encode_msg(&mut buf, Version::Lite05Wip).unwrap(); + let mut slice = buf.as_slice(); + let got = Track::decode_msg(&mut slice, Version::Lite05Wip).unwrap(); + assert_eq!(got.broadcast, Path::new("room/1")); + assert_eq!(got.track, "video"); + } +} diff --git a/rs/moq-net/src/model/broadcast.rs b/rs/moq-net/src/model/broadcast.rs index ab6b53bc5..eefe200be 100644 --- a/rs/moq-net/src/model/broadcast.rs +++ b/rs/moq-net/src/model/broadcast.rs @@ -42,14 +42,37 @@ type PendingSlot = Option>; /// One waiting subscriber: its preferences and the producer side of its resolver channel. type Resolver = (Subscription, kio::Producer); -/// A track that has been subscribed to but not yet served by the dynamic handler. +/// The slot a pending `info()` resolves into: `None` until accept delivers the +/// track's immutable [`Track`] (or a denial). The info-only analogue of +/// [`PendingSlot`]; it never creates a subscription. +type InfoSlot = Option>; + +/// One waiting `info()` caller: the producer side of its resolver channel. +type InfoResolver = kio::Producer; + +/// A track that has been requested but not yet served by the dynamic handler. /// -/// Multiple subscribers to the same name before it is accepted coalesce into one -/// pending request, each adding a resolver channel so they all receive a consumer -/// for the same producer once it is accepted. +/// Subscribers and `info()` callers for the same name before it is accepted +/// coalesce into one pending request: each subscriber adds a resolver channel (so +/// they all receive a consumer for the same producer) and each `info()` caller +/// adds an info resolver (so they all receive the resolved [`Track`]). A single +/// `accept` resolves both. #[derive(Default)] struct PendingRequest { resolvers: Vec, + info_resolvers: Vec, +} + +impl PendingRequest { + /// Fail every waiting subscriber and `info()` caller with `err`. + fn fail(self, err: &Error) { + fail_resolvers(self.resolvers, err); + for slot in self.info_resolvers { + if let Ok(mut slot) = slot.write() { + *slot = Some(Err(err.clone())); + } + } + } } /// Resolve every waiting subscriber with `err`. @@ -71,7 +94,7 @@ struct State { requests: HashMap, // Requested names in FIFO order for the dynamic handler to drain. A name - // stays in `requests` (but not here) once handed out as a `TrackRequest`. + // stays in `requests` (but not here) once handed out as a `PendingTrack`. request_order: VecDeque, // The current number of dynamic producers. @@ -99,19 +122,21 @@ impl State { Ok(()) } - /// Drop every pending request, notifying all waiting subscribers with `err`. + /// Drop every pending request, notifying all waiting subscribers and `info()` + /// callers with `err`. fn abort_requests(&mut self, err: &Error) { self.request_order.clear(); for (_, pending) in self.requests.drain() { - fail_resolvers(pending.resolvers, err); + pending.fail(err); } } - /// Drop a single named pending request, notifying its subscribers with `err`. + /// Drop a single named pending request, notifying its subscribers and `info()` + /// callers with `err`. fn deny_request(&mut self, name: &str, err: Error) { self.request_order.retain(|n| n != name); if let Some(pending) = self.requests.remove(name) { - fail_resolvers(pending.resolvers, &err); + pending.fail(&err); } } } @@ -250,25 +275,26 @@ impl BroadcastProducer { /// then either [`Self::accept`]s it with a concrete [`Track`], which resolves all /// waiting subscribers, or [`Self::deny`]s it. Dropping without doing either /// denies with [`Error::Cancel`]. -pub struct TrackRequest { +pub struct PendingTrack { name: String, - subscription: Subscription, + subscription: Option, state: kio::Weak, /// Set once accepted or denied so [`Drop`] doesn't deny a second time. completed: bool, } -impl TrackRequest { +impl PendingTrack { /// The requested track name. pub fn name(&self) -> &str { &self.name } - /// The first waiting subscriber's preferences, as a hint for constructing the - /// [`Track`]. The full aggregate is available on the [`TrackProducer`] returned - /// by [`Self::accept`] via [`TrackProducer::subscription`]. - pub fn subscription(&self) -> &Subscription { - &self.subscription + /// The first waiting subscriber's preferences, or `None` when only `info()` + /// callers are waiting (no group demand). A handler uses this to decide whether + /// to open an upstream subscription. The full aggregate is available on the + /// [`TrackProducer`] returned by [`Self::accept`]. + pub fn subscription(&self) -> Option { + self.subscription.clone() } /// Serve the request with the given track, resolving every waiting subscriber. @@ -289,11 +315,19 @@ impl TrackRequest { let pending = state.requests.remove(&self.name).ok_or(Error::Cancel)?; state.request_order.retain(|n| n != &self.name); + let info = track.clone(); let producer = TrackProducer::new(track); // Insert a weak reference so future subscribers dedupe onto this producer. state.tracks.insert(self.name.clone(), producer.weak()); + // Wake any `info()` callers with the resolved properties. + for slot in pending.info_resolvers { + if let Ok(mut slot) = slot.write() { + *slot = Some(Ok(info.clone())); + } + } + // Hand each waiting subscriber a consumer carrying its own preferences. // Building it here (not when the subscriber polls) means it counts toward // the producer immediately, so a publisher checking `unused()` right after @@ -342,7 +376,7 @@ impl TrackRequest { } } -impl Drop for TrackRequest { +impl Drop for PendingTrack { fn drop(&mut self) { if !self.completed && let Ok(mut state) = self.state.write() @@ -419,12 +453,76 @@ impl std::future::Future for TrackPending { } } +/// A pending info lookup returned by [`TrackConsumer::info`]. +/// +/// Resolves to the track's immutable [`Track`] once it's known: synchronously if a +/// producer already exists, otherwise once the dynamic handler accepts the +/// coalesced request. Implements [`Future`], so `.await` it; poll-based callers can +/// drive it with [`Self::poll_info`]. +pub struct InfoPending { + inner: InfoPendingInner, + /// Kept alive between `Future::poll` calls so its resolver-channel registration + /// stays valid until the next poll replaces it. + waiter: Option, +} + +enum InfoPendingInner { + /// Resolved synchronously: a producer existed, or it failed immediately. + Ready(Result), + /// Waiting for the dynamic handler to accept or deny. + Waiting(kio::Consumer), +} + +impl InfoPending { + fn ready(result: Result) -> Self { + Self { + inner: InfoPendingInner::Ready(result), + waiter: None, + } + } + + fn waiting(consumer: kio::Consumer) -> Self { + Self { + inner: InfoPendingInner::Waiting(consumer), + waiter: None, + } + } + + /// Poll for the resolved [`Track`], without blocking. + pub fn poll_info(&self, waiter: &kio::Waiter) -> Poll> { + match &self.inner { + InfoPendingInner::Ready(result) => Poll::Ready(result.clone()), + InfoPendingInner::Waiting(consumer) => match consumer.poll(waiter, |slot| match &**slot { + Some(result) => Poll::Ready(result.clone()), + None => Poll::Pending, + }) { + Poll::Ready(Ok(result)) => Poll::Ready(result), + Poll::Ready(Err(closed)) => Poll::Ready(match &*closed { + Some(result) => result.clone(), + None => Err(Error::Cancel), + }), + Poll::Pending => Poll::Pending, + }, + } + } +} + +impl std::future::Future for InfoPending { + type Output = Result; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.get_mut(); + this.waiter = Some(kio::Waiter::new(cx.waker().clone())); + this.poll_info(this.waiter.as_ref().unwrap()) + } +} + /// Handles on-demand track creation for a broadcast. /// /// When a consumer requests a track that doesn't exist, the dynamic producer /// picks up the request via [`Self::requested_track`] and either -/// [`TrackRequest::accept`]s it with a concrete [`Track`] or -/// [`TrackRequest::deny`]s it. Dropped when no longer needed; pending requests +/// [`PendingTrack::accept`]s it with a concrete [`Track`] or +/// [`PendingTrack::deny`]s it. Dropped when no longer needed; pending requests /// are automatically aborted. pub struct BroadcastDynamic { info: Broadcast, @@ -478,20 +576,21 @@ impl BroadcastDynamic { } /// Poll for the next consumer-requested track, without blocking. - pub fn poll_requested_track(&mut self, waiter: &kio::Waiter) -> Poll> { + pub fn poll_requested_track(&mut self, waiter: &kio::Waiter) -> Poll> { let weak = self.state.weak(); self.poll(waiter, |state| { let Some(name) = state.request_order.pop_front() else { return Poll::Pending; }; - // The name stays in `requests` so concurrent subscribers can still - // coalesce onto it until the publisher accepts or denies. + // The name stays in `requests` so concurrent subscribers (and info() + // callers) can still coalesce onto it until the publisher accepts or denies. let pending = state.requests.get(&name).expect("request_order out of sync"); - let subscription = pending.resolvers.first().map(|(s, _)| s.clone()).unwrap_or_default(); + // `None` when only `info()` callers are waiting (no group demand yet). + let subscription = pending.resolvers.first().map(|(s, _)| s.clone()); Poll::Ready((name, subscription)) }) .map(|res| { - res.map(|(name, subscription)| TrackRequest { + res.map(|(name, subscription)| PendingTrack { name, subscription, state: weak, @@ -500,8 +599,8 @@ impl BroadcastDynamic { }) } - /// Block until a consumer requests a track, returning a [`TrackRequest`] to serve. - pub async fn requested_track(&mut self) -> Result { + /// Block until a consumer requests a track, returning a [`PendingTrack`] to serve. + pub async fn requested_track(&mut self) -> Result { kio::wait(|waiter| self.poll_requested_track(waiter)).await } @@ -562,7 +661,7 @@ use futures::FutureExt; #[cfg(test)] impl BroadcastDynamic { - pub fn assert_request(&mut self) -> TrackRequest { + pub fn assert_request(&mut self) -> PendingTrack { self.requested_track() .now_or_never() .expect("should not have blocked") @@ -608,7 +707,7 @@ impl BroadcastConsumer { /// /// Reuses a live producer if one is already publishing the track (the pending /// resolves right away), otherwise queues a dynamic request served via - /// [`BroadcastDynamic::requested_track`] and [`TrackRequest::accept`] (for the + /// [`BroadcastDynamic::requested_track`] and [`PendingTrack::accept`] (for the /// wire this is SUBSCRIBE_OK). Resolves to [`Error::NotFound`] if no dynamic /// producer exists to handle the request. fn request_subscribe(&self, name: &str, subscription: Subscription) -> TrackPending { @@ -644,6 +743,7 @@ impl BroadcastConsumer { name.to_string(), PendingRequest { resolvers: vec![(subscription, slot)], + ..Default::default() }, ); state.request_order.push_back(name.to_string()); @@ -652,6 +752,53 @@ impl BroadcastConsumer { TrackPending::waiting(consumer) } + /// Resolve a track's immutable [`Track`] info, returning an [`InfoPending`]. + /// + /// Returns immediately when a live producer already carries it (warm via + /// `tracks`); otherwise coalesces onto the track's dynamic request (alongside + /// any subscribers) and resolves when the handler [`PendingTrack::accept`]s it. + /// Resolves to [`Error::NotFound`] if no dynamic producer exists to handle it. + /// An `info()` caller adds no subscription. + fn request_info(&self, name: &str) -> InfoPending { + let Some(producer) = self.state.produce() else { + let err = self.state.read().abort.clone().unwrap_or(Error::Dropped); + return InfoPending::ready(Err(err)); + }; + let mut state = match modify(&producer) { + Ok(state) => state, + Err(err) => return InfoPending::ready(Err(err)), + }; + + // A live producer carries the info; return it without a round trip. + if let Some(weak) = state.tracks.get(name) { + if !weak.is_closed() { + return InfoPending::ready(Ok(weak.info.clone())); + } + state.tracks.remove(name); + } + + let slot = kio::Producer::new(None); + let consumer = slot.consume(); + + if let Some(pending) = state.requests.get_mut(name) { + // Coalesce onto an in-flight request (subscribe or info) for the same name. + pending.info_resolvers.push(slot); + } else if state.dynamic == 0 { + return InfoPending::ready(Err(Error::NotFound)); + } else { + state.requests.insert( + name.to_string(), + PendingRequest { + info_resolvers: vec![slot], + ..Default::default() + }, + ); + state.request_order.push_back(name.to_string()); + } + + InfoPending::waiting(consumer) + } + /// Block until the broadcast is closed and return the cause. /// /// Returns [`Error::Dropped`] if every producer was dropped without an @@ -715,6 +862,16 @@ impl TrackConsumer { self.broadcast .request_subscribe(&self.name, subscription.into().unwrap_or_default()) } + + /// Resolve this track's immutable [`Track`] properties without subscribing. + /// + /// Returns an [`InfoPending`] that resolves to the publisher's properties + /// (timescale, compression, cache, ...). `.await` it, or drive it with + /// [`InfoPending::poll_info`] from a poll loop. Warm (a live producer exists) it + /// resolves with no round trip. + pub fn info(&self) -> InfoPending { + self.broadcast.request_info(&self.name) + } } #[cfg(test)] @@ -940,4 +1097,52 @@ mod test { // instead of failing with NotFound. let _fut = subscribe_pending!(consumer, "track1"); } + + #[tokio::test] + async fn info_warm_from_live_producer() { + let mut producer = Broadcast::new().produce(); + let track = Track::new("video").with_timescale(crate::Timescale::MICRO).produce(); + producer.assert_insert_track(&track); + let consumer = producer.consume(); + + // A live producer carries the info, so info() resolves with no round trip. + let got = consumer + .consume_track("video") + .info() + .now_or_never() + .expect("warm info should not block") + .unwrap(); + assert_eq!(got.timescale, Some(crate::Timescale::MICRO)); + } + + #[tokio::test] + async fn info_cold_resolves_via_accept() { + let mut producer = Broadcast::new().produce().dynamic(); + let consumer = producer.consume(); + let consumer2 = consumer.clone(); + + // No producer yet: two info() callers coalesce onto one request (no subscriber). + let info1 = consumer.consume_track("video").info(); + assert!(info1.poll_info(&kio::Waiter::noop()).is_pending()); + let info2 = consumer2.consume_track("video").info(); + + let request = producer.assert_request(); + assert_eq!(request.name(), "video"); + request + .accept(Track::new("video").with_timescale(crate::Timescale::MICRO)) + .unwrap(); + + assert_eq!(info1.await.unwrap().timescale, Some(crate::Timescale::MICRO)); + // Second caller and any later lookup read the live producer, no new request. + assert_eq!(info2.await.unwrap().timescale, Some(crate::Timescale::MICRO)); + producer.assert_no_request(); + } + + #[tokio::test] + async fn info_missing_without_dynamic() { + let producer = Broadcast::new().produce(); + let consumer = producer.consume(); + let err = consumer.consume_track("video").info().await.unwrap_err(); + assert!(matches!(err, Error::NotFound)); + } } diff --git a/rs/moq-net/src/model/track.rs b/rs/moq-net/src/model/track.rs index 4c2f6588c..0dd41bea3 100644 --- a/rs/moq-net/src/model/track.rs +++ b/rs/moq-net/src/model/track.rs @@ -487,10 +487,154 @@ fn max_option(a: Option, b: Option) -> Option { } } +/// A track producer whose immutable [`Track`] properties aren't known yet. +/// +/// Holds the shared state (subscriptions, demand, groups) but no `info`. This is +/// the headless half of a [`TrackProducer`]: a relay can accept subscribers and +/// observe demand ([`Self::subscription`]) on it before the properties are +/// resolved, then [`Self::accept`] the resolved [`Track`] to upgrade it into a +/// full producer that can create groups. +#[derive(Default, Clone)] +pub struct TrackRequest { + state: kio::Producer, +} + +impl TrackRequest { + /// Build an empty request with no consumers yet. + pub fn new() -> Self { + Self::default() + } + + /// Resolve the track's immutable properties, upgrading into a [`TrackProducer`]. + pub fn accept(self, info: Track) -> TrackProducer { + TrackProducer { request: self, info } + } + + /// Mark the track as finished after the last appended group. + /// + /// Sets the final sequence to one past the current max_sequence. + /// No new groups at or above this sequence can be appended. + /// NOTE: Old groups with lower sequence numbers can still arrive. + pub fn finish(&mut self) -> Result<()> { + let mut state = self.modify()?; + if state.final_sequence.is_some() { + return Err(Error::Closed); + } + state.final_sequence = Some(match state.max_sequence { + Some(max) => max.checked_add(1).ok_or(coding::BoundsExceeded)?, + None => 0, + }); + Ok(()) + } + + /// Mark the track as finished at an exact final sequence. + /// + /// The caller must pass the current max_sequence exactly. + /// Freezes the final boundary at one past the current max_sequence. + /// No new groups at or above that sequence can be created. + /// NOTE: Old groups with lower sequence numbers can still arrive. + pub fn finish_at(&mut self, sequence: u64) -> Result<()> { + let mut state = self.modify()?; + let max = state.max_sequence.ok_or(Error::Closed)?; + if state.final_sequence.is_some() || sequence != max { + return Err(Error::Closed); + } + state.final_sequence = Some(max.checked_add(1).ok_or(coding::BoundsExceeded)?); + Ok(()) + } + + /// Abort the track with the given error. + /// + /// Child groups are independent and must be aborted separately if desired; + /// existing group consumers can still finish reading any groups that were + /// already created. + pub fn abort(&mut self, err: Error) -> Result<()> { + let mut guard = self.modify()?; + guard.abort = Some(err); + guard.close(); + Ok(()) + } + + /// The aggregate of every live subscriber's [`Subscription`] (most demanding + /// request across all consumers), or `None` when there are no live subscribers. + pub fn subscription(&self) -> Option { + self.state + .write() + .ok() + .and_then(|mut state| state.aggregate_subscription()) + } + + /// Block until there are no active consumers. + pub async fn unused(&self) -> Result<()> { + self.state + .unused() + .await + .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped)) + } + + /// Block until there is at least one active consumer. + pub async fn used(&self) -> Result<()> { + self.state + .used() + .await + .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped)) + } + + /// Block until the track is closed or aborted, returning the cause. + pub async fn closed(&self) -> Error { + self.state.closed().await; + self.state.read().abort.clone().unwrap_or(Error::Dropped) + } + + /// Return true if the track has been closed. + pub fn is_closed(&self) -> bool { + self.state.read().is_closed() + } + + /// Return the latest sequence number successfully appended to the track. + pub fn latest(&self) -> Option { + self.state.read().max_sequence + } + + /// Return true if this is the same track. + pub fn is_clone(&self, other: &Self) -> bool { + self.state.same_channel(&other.state) + } + + /// Register a subscription `Arc` and build the consumer once `info` is known. + fn subscribe_with(&self, info: &Track, subscription: impl Into>) -> TrackSubscriber { + let mut subscription = subscription.into().unwrap_or_default(); + subscription.stale = info.clamp_stale(subscription.stale); + let subscription = Arc::new(Mutex::new(subscription)); + if let Ok(mut state) = self.state.write() { + state.subscriptions.push(Arc::downgrade(&subscription)); + } + TrackSubscriber { + info: info.clone(), + state: self.state.consume(), + subscription, + index: 0, + min_sequence: 0, + next_sequence: 0, + end_sequence: None, + } + } + + fn modify(&self) -> Result> { + self.state + .write() + .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped)) + } +} + /// A producer for a track, used to create new groups. +/// +/// A [`TrackRequest`] plus the resolved immutable [`Track`] properties. Derefs to +/// the [`Track`], so `producer.name` / `producer.timescale` etc. read the resolved +/// values. pub struct TrackProducer { + request: TrackRequest, info: Track, - state: kio::Producer, } impl std::ops::Deref for TrackProducer { @@ -505,8 +649,8 @@ impl TrackProducer { /// Build a producer for the given track metadata. Prefer [`Track::produce`]. pub fn new(info: Track) -> Self { Self { + request: TrackRequest::new(), info, - state: kio::Producer::default(), } } @@ -514,7 +658,7 @@ impl TrackProducer { pub fn create_group(&mut self, info: Group) -> Result { let group = GroupProducer::new_with_timescale(info, self.info.timescale); - let mut state = self.modify()?; + let mut state = self.request.modify()?; if let Some(fin) = state.final_sequence && group.sequence >= fin { @@ -535,7 +679,7 @@ impl TrackProducer { /// Create a new group with the next sequence number. pub fn append_group(&mut self) -> Result { - let mut state = self.modify()?; + let mut state = self.request.modify()?; let sequence = match state.max_sequence { Some(s) => s.checked_add(1).ok_or(coding::BoundsExceeded)?, None => 0, @@ -565,140 +709,79 @@ impl TrackProducer { Ok(()) } - /// Mark the track as finished after the last appended group. - /// - /// Sets the final sequence to one past the current max_sequence. - /// No new groups at or above this sequence can be appended. - /// NOTE: Old groups with lower sequence numbers can still arrive. - pub fn finish(&mut self) -> Result<()> { - let mut state = self.modify()?; - if state.final_sequence.is_some() { - return Err(Error::Closed); - } - state.final_sequence = Some(match state.max_sequence { - Some(max) => max.checked_add(1).ok_or(coding::BoundsExceeded)?, - None => 0, - }); - Ok(()) - } - - /// Mark the track as finished at an exact final sequence. - /// - /// The caller must pass the current max_sequence exactly. - /// Freezes the final boundary at one past the current max_sequence. - /// No new groups at or above that sequence can be created. - /// NOTE: Old groups with lower sequence numbers can still arrive. - pub fn finish_at(&mut self, sequence: u64) -> Result<()> { - let mut state = self.modify()?; - let max = state.max_sequence.ok_or(Error::Closed)?; - if state.final_sequence.is_some() || sequence != max { - return Err(Error::Closed); - } - state.final_sequence = Some(max.checked_add(1).ok_or(coding::BoundsExceeded)?); - Ok(()) - } - - /// Abort the track with the given error. - /// - /// Child groups are independent and must be aborted separately if desired; - /// existing group consumers can still finish reading any groups that were - /// already created. - pub fn abort(&mut self, err: Error) -> Result<()> { - let mut guard = self.modify()?; - guard.abort = Some(err); - guard.close(); - Ok(()) - } - /// Subscribe to the track in-process with the given subscriber preferences. /// /// Pass `None` for [`Subscription::default`]. The preferences feed the /// producer's [`Self::subscription`] aggregate and can be changed later via /// [`TrackSubscriber::update`]. pub fn subscribe(&self, subscription: impl Into>) -> TrackSubscriber { - let mut subscription = subscription.into().unwrap_or_default(); - subscription.stale = self.info.clamp_stale(subscription.stale); - let subscription = Arc::new(Mutex::new(subscription)); - if let Ok(mut state) = self.state.write() { - state.subscriptions.push(Arc::downgrade(&subscription)); - } - TrackSubscriber { + self.request.subscribe_with(&self.info, subscription) + } + + /// Create a weak reference that doesn't prevent auto-close. + pub(crate) fn weak(&self) -> TrackWeak { + TrackWeak { info: self.info.clone(), - state: self.state.consume(), - subscription, - index: 0, - min_sequence: 0, - next_sequence: 0, - end_sequence: None, + state: self.request.state.weak(), } } - /// The aggregate of every live subscriber's [`Subscription`] (most demanding - /// request across all consumers), or `None` when there are no live subscribers. + /// Mark the track as finished after the last appended group. See [`TrackRequest::finish`]. + pub fn finish(&mut self) -> Result<()> { + self.request.finish() + } + + /// Mark the track as finished at an exact final sequence. See [`TrackRequest::finish_at`]. + pub fn finish_at(&mut self, sequence: u64) -> Result<()> { + self.request.finish_at(sequence) + } + + /// Abort the track with the given error. See [`TrackRequest::abort`]. + pub fn abort(&mut self, err: Error) -> Result<()> { + self.request.abort(err) + } + + /// The aggregate of every live subscriber's [`Subscription`]. See [`TrackRequest::subscription`]. pub fn subscription(&self) -> Option { - self.state - .write() - .ok() - .and_then(|mut state| state.aggregate_subscription()) + self.request.subscription() } /// Block until there are no active consumers. pub async fn unused(&self) -> Result<()> { - self.state - .unused() - .await - .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped)) + self.request.unused().await } /// Block until there is at least one active consumer. pub async fn used(&self) -> Result<()> { - self.state - .used() - .await - .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped)) + self.request.used().await } /// Block until the track is closed or aborted, returning the cause. pub async fn closed(&self) -> Error { - self.state.closed().await; - self.state.read().abort.clone().unwrap_or(Error::Dropped) + self.request.closed().await } /// Return true if the track has been closed. pub fn is_closed(&self) -> bool { - self.state.read().is_closed() + self.request.is_closed() } /// Return the latest sequence number successfully appended to the track. pub fn latest(&self) -> Option { - self.state.read().max_sequence + self.request.latest() } /// Return true if this is the same track. pub fn is_clone(&self, other: &Self) -> bool { - self.state.same_channel(&other.state) - } - - /// Create a weak reference that doesn't prevent auto-close. - pub(crate) fn weak(&self) -> TrackWeak { - TrackWeak { - info: self.info.clone(), - state: self.state.weak(), - } - } - - fn modify(&self) -> Result> { - self.state - .write() - .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped)) + self.request.is_clone(&other.request) } } impl Clone for TrackProducer { fn clone(&self) -> Self { Self { + request: self.request.clone(), info: self.info.clone(), - state: self.state.clone(), } } } @@ -1034,7 +1117,7 @@ mod test { producer.append_group().unwrap(); // seq 2 { - let state = producer.state.read(); + let state = producer.request.state.read(); assert_eq!(live_groups(&state), 3); assert_eq!(state.offset, 0); } @@ -1048,7 +1131,7 @@ mod test { // Groups 0, 1, 2 are expired but seq 3 (max_sequence) is kept. // Leading tombstones are trimmed, so only seq 3 remains. { - let state = producer.state.read(); + let state = producer.request.state.read(); assert_eq!(live_groups(&state), 1); assert_eq!(first_live_sequence(&state), 3); assert_eq!(state.offset, 3); @@ -1073,7 +1156,7 @@ mod test { producer.append_group().unwrap(); // seq 1 { - let state = producer.state.read(); + let state = producer.request.state.read(); assert_eq!(live_groups(&state), 1); assert_eq!(first_live_sequence(&state), 1); assert_eq!(state.offset, 1); @@ -1090,7 +1173,7 @@ mod test { producer.append_group().unwrap(); // seq 2 { - let state = producer.state.read(); + let state = producer.request.state.read(); assert_eq!(live_groups(&state), 3); assert_eq!(state.offset, 0); } @@ -1126,7 +1209,7 @@ mod test { producer.append_group().unwrap(); // seq 1 // Seq 0 is gone because the publisher only keeps groups for 1s. - let state = producer.state.read(); + let state = producer.request.state.read(); assert_eq!(live_groups(&state), 1); assert_eq!(first_live_sequence(&state), 1); } @@ -1161,7 +1244,7 @@ mod test { // max_sequence = 5, which is at the front of the VecDeque. { - let state = producer.state.read(); + let state = producer.request.state.read(); assert_eq!(state.max_sequence, Some(5)); } @@ -1174,7 +1257,7 @@ mod test { // Seq 3, 4, 5 are all expired. Seq 5 was the old max_sequence but now 6 is. // All old groups are evicted. { - let state = producer.state.read(); + let state = producer.request.state.read(); assert_eq!(live_groups(&state), 1); assert_eq!(first_live_sequence(&state), 6); assert!(!state.duplicates.contains(&3)); @@ -1201,7 +1284,7 @@ mod test { // Seq 5 is max_sequence (protected). Seq 3 is not expired (just created). // Nothing should be evicted. { - let state = producer.state.read(); + let state = producer.request.state.read(); assert_eq!(live_groups(&state), 2); assert_eq!(state.offset, 0); } @@ -1217,7 +1300,7 @@ mod test { // Seq 2 is fresh → kept. // VecDeque: [Some(5), None, Some(2)]. Leading entry is Some, so offset stays. { - let state = producer.state.read(); + let state = producer.request.state.read(); assert_eq!(live_groups(&state), 2); assert_eq!(state.offset, 0); assert!(state.duplicates.contains(&5)); @@ -1262,7 +1345,7 @@ mod test { assert!(producer.finish_at(5).is_ok()); { - let state = producer.state.read(); + let state = producer.request.state.read(); assert_eq!(state.final_sequence, Some(6)); } @@ -1722,7 +1805,7 @@ mod test { fn append_group_returns_bounds_exceeded_on_sequence_overflow() { let mut producer = Track::new("test").produce(); { - let mut state = producer.state.write().ok().unwrap(); + let mut state = producer.request.state.write().ok().unwrap(); state.max_sequence = Some(u64::MAX); }