diff --git a/js/net/src/lite/fetch.test.ts b/js/net/src/lite/fetch.test.ts new file mode 100644 index 000000000..47aef2154 --- /dev/null +++ b/js/net/src/lite/fetch.test.ts @@ -0,0 +1,54 @@ +import { expect, test } from "bun:test"; +import * as Path from "../path.ts"; +import { Reader, Writer } from "../stream.ts"; +import { Fetch } from "./fetch.ts"; +import { Version } from "./version.ts"; + +function concat(chunks: Uint8Array[]): Uint8Array { + const total = chunks.reduce((sum, c) => sum + c.byteLength, 0); + const out = new Uint8Array(total); + let offset = 0; + for (const c of chunks) { + out.set(c, offset); + offset += c.byteLength; + } + return out; +} + +async function encode(version: Version, fetch: Fetch): Promise { + const written: Uint8Array[] = []; + const writer = new Writer( + new WritableStream({ write: (chunk) => void written.push(new Uint8Array(chunk)) }), + ); + await fetch.encode(writer, version); + writer.close(); + await writer.closed; + return concat(written); +} + +async function roundtrip(version: Version, fetch: Fetch): Promise { + const reader = new Reader(undefined, await encode(version, fetch)); + return Fetch.decode(reader, version); +} + +function sample(): Fetch { + return new Fetch(Path.from("room/1"), "video", 3, 42, 7); +} + +test("Fetch: frameStart round-trips on draft-05", async () => { + const got = await roundtrip(Version.DRAFT_05_WIP, sample()); + expect(got.group).toBe(42); + expect(got.frameStart).toBe(7); +}); + +test("Fetch: frameStart is absent before draft-05", async () => { + // draft-03/draft-04 don't carry the frame start varint, so it decodes to 0. + const got = await roundtrip(Version.DRAFT_04, sample()); + expect(got.group).toBe(42); + expect(got.frameStart).toBe(0); + + // The draft-04 encoding is strictly shorter (no trailing frame start varint). + const buf04 = await encode(Version.DRAFT_04, sample()); + const buf05 = await encode(Version.DRAFT_05_WIP, sample()); + expect(buf05.byteLength).toBeGreaterThan(buf04.byteLength); +}); diff --git a/js/net/src/lite/fetch.ts b/js/net/src/lite/fetch.ts index 6ca8863cd..f3f7d3c92 100644 --- a/js/net/src/lite/fetch.ts +++ b/js/net/src/lite/fetch.ts @@ -18,36 +18,63 @@ export class Fetch { track: string; priority: number; group: number; + /** + * The 0-based index of the first frame to return; the publisher skips all + * earlier frames. `0` returns the entire group. Draft-05+ only; older drafts + * always return the whole group. + */ + frameStart: number; - constructor(broadcast: Path.Valid, track: string, priority: number, group: number) { + constructor(broadcast: Path.Valid, track: string, priority: number, group: number, frameStart = 0) { this.broadcast = broadcast; this.track = track; this.priority = priority; this.group = group; + this.frameStart = frameStart; } - async #encode(w: Writer) { + async #encode(w: Writer, version: Version) { await w.string(this.broadcast); await w.string(this.track); await w.u8(this.priority); await w.u53(this.group); + + switch (version) { + case Version.DRAFT_03: + case Version.DRAFT_04: + break; + default: + await w.u53(this.frameStart); + break; + } } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version: Version): Promise { const broadcast = Path.from(await r.string()); const track = await r.string(); const priority = await r.u8(); const group = await r.u53(); - return new Fetch(broadcast, track, priority, group); + + let frameStart = 0; + switch (version) { + case Version.DRAFT_03: + case Version.DRAFT_04: + break; + default: + frameStart = await r.u53(); + break; + } + + return new Fetch(broadcast, track, priority, group, frameStart); } async encode(w: Writer, version: Version): Promise { guardFetch(version); - return Message.encode(w, this.#encode.bind(this)); + return Message.encode(w, (w) => this.#encode(w, version)); } static async decode(r: Reader, version: Version): Promise { guardFetch(version); - return Message.decode(r, Fetch.#decode); + return Message.decode(r, (r) => Fetch.#decode(r, version)); } } diff --git a/rs/moq-net/src/lite/fetch.rs b/rs/moq-net/src/lite/fetch.rs index 04281e65f..4af8229b6 100644 --- a/rs/moq-net/src/lite/fetch.rs +++ b/rs/moq-net/src/lite/fetch.rs @@ -17,6 +17,10 @@ pub struct Fetch<'a> { pub track: Cow<'a, str>, pub priority: u8, pub group: u64, + /// The 0-based index of the first frame to return; the publisher skips all + /// earlier frames. `0` returns the entire group. Lite05+ only; older drafts + /// always return the whole group. + pub frame_start: u64, } impl Message for Fetch<'_> { @@ -33,11 +37,17 @@ impl Message for Fetch<'_> { let priority = u8::decode(r, version)?; let group = u64::decode(r, version)?; + let frame_start = match version { + Version::Lite03 | Version::Lite04 => 0, + _ => u64::decode(r, version)?, + }; + Ok(Self { broadcast, track, priority, group, + frame_start, }) } @@ -53,6 +63,56 @@ impl Message for Fetch<'_> { self.track.encode(w, version)?; self.priority.encode(w, version)?; self.group.encode(w, version)?; + + match version { + Version::Lite03 | Version::Lite04 => {} + _ => self.frame_start.encode(w, version)?, + } + Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn sample() -> Fetch<'static> { + Fetch { + broadcast: Path::new("room/1"), + track: Cow::Borrowed("video"), + priority: 3, + group: 42, + frame_start: 7, + } + } + + fn roundtrip(version: Version, fetch: &Fetch<'_>) -> Fetch<'static> { + let mut buf = Vec::new(); + fetch.encode_msg(&mut buf, version).unwrap(); + let mut r = buf.as_slice(); + Fetch::decode_msg(&mut r, version).unwrap() + } + + #[test] + fn frame_start_roundtrips_on_lite05() { + let got = roundtrip(Version::Lite05Wip, &sample()); + assert_eq!(got.group, 42); + assert_eq!(got.frame_start, 7); + } + + #[test] + fn frame_start_absent_before_lite05() { + // Lite03/Lite04 don't carry the frame start varint, so it always decodes as 0. + let got = roundtrip(Version::Lite04, &sample()); + assert_eq!(got.group, 42); + assert_eq!(got.frame_start, 0); + + // The lite-04 encoding is strictly shorter (no trailing frame start varint). + let mut buf04 = Vec::new(); + sample().encode_msg(&mut buf04, Version::Lite04).unwrap(); + let mut buf05 = Vec::new(); + sample().encode_msg(&mut buf05, Version::Lite05Wip).unwrap(); + assert!(buf05.len() > buf04.len(), "lite-05 carries an extra frame start varint"); + } +}