Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions js/net/src/lite/fetch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { expect, test } from "bun:test";
import * as Path from "../path.ts";
import { Reader, Writer } from "../stream.ts";
import { Fetch } from "./fetch.ts";
import { Version } from "./version.ts";

function concat(chunks: Uint8Array[]): Uint8Array {
const total = chunks.reduce((sum, c) => sum + c.byteLength, 0);
const out = new Uint8Array(total);
let offset = 0;
for (const c of chunks) {
out.set(c, offset);
offset += c.byteLength;
}
return out;
}

async function encode(version: Version, fetch: Fetch): Promise<Uint8Array> {
const written: Uint8Array[] = [];
const writer = new Writer(
new WritableStream<Uint8Array>({ write: (chunk) => void written.push(new Uint8Array(chunk)) }),
);
await fetch.encode(writer, version);
writer.close();
await writer.closed;
return concat(written);
}

async function roundtrip(version: Version, fetch: Fetch): Promise<Fetch> {
const reader = new Reader(undefined, await encode(version, fetch));
return Fetch.decode(reader, version);
}

function sample(): Fetch {
return new Fetch(Path.from("room/1"), "video", 3, 42, 7);
}

test("Fetch: frameStart round-trips on draft-05", async () => {
const got = await roundtrip(Version.DRAFT_05_WIP, sample());
expect(got.group).toBe(42);
expect(got.frameStart).toBe(7);
});

test("Fetch: frameStart is absent before draft-05", async () => {
// draft-03/draft-04 don't carry the frame start varint, so it decodes to 0.
const got = await roundtrip(Version.DRAFT_04, sample());
expect(got.group).toBe(42);
expect(got.frameStart).toBe(0);

// The draft-04 encoding is strictly shorter (no trailing frame start varint).
const buf04 = await encode(Version.DRAFT_04, sample());
const buf05 = await encode(Version.DRAFT_05_WIP, sample());
expect(buf05.byteLength).toBeGreaterThan(buf04.byteLength);
});
39 changes: 33 additions & 6 deletions js/net/src/lite/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,63 @@ export class Fetch {
track: string;
priority: number;
group: number;
/**
* The 0-based index of the first frame to return; the publisher skips all
* earlier frames. `0` returns the entire group. Draft-05+ only; older drafts
* always return the whole group.
*/
frameStart: number;

constructor(broadcast: Path.Valid, track: string, priority: number, group: number) {
constructor(broadcast: Path.Valid, track: string, priority: number, group: number, frameStart = 0) {
this.broadcast = broadcast;
this.track = track;
this.priority = priority;
this.group = group;
this.frameStart = frameStart;
}

async #encode(w: Writer) {
async #encode(w: Writer, version: Version) {
await w.string(this.broadcast);
await w.string(this.track);
await w.u8(this.priority);
await w.u53(this.group);

switch (version) {
case Version.DRAFT_03:
case Version.DRAFT_04:
break;
default:
await w.u53(this.frameStart);
break;
}
}

static async #decode(r: Reader): Promise<Fetch> {
static async #decode(r: Reader, version: Version): Promise<Fetch> {
const broadcast = Path.from(await r.string());
const track = await r.string();
const priority = await r.u8();
const group = await r.u53();
return new Fetch(broadcast, track, priority, group);

let frameStart = 0;
switch (version) {
case Version.DRAFT_03:
case Version.DRAFT_04:
break;
default:
frameStart = await r.u53();
break;
}

return new Fetch(broadcast, track, priority, group, frameStart);
}

async encode(w: Writer, version: Version): Promise<void> {
guardFetch(version);
return Message.encode(w, this.#encode.bind(this));
return Message.encode(w, (w) => this.#encode(w, version));
}

static async decode(r: Reader, version: Version): Promise<Fetch> {
guardFetch(version);
return Message.decode(r, Fetch.#decode);
return Message.decode(r, (r) => Fetch.#decode(r, version));
}
}
60 changes: 60 additions & 0 deletions rs/moq-net/src/lite/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ pub struct Fetch<'a> {
pub track: Cow<'a, str>,
pub priority: u8,
pub group: u64,
/// The 0-based index of the first frame to return; the publisher skips all
/// earlier frames. `0` returns the entire group. Lite05+ only; older drafts
/// always return the whole group.
pub frame_start: u64,
}

impl Message for Fetch<'_> {
Expand All @@ -33,11 +37,17 @@ impl Message for Fetch<'_> {
let priority = u8::decode(r, version)?;
let group = u64::decode(r, version)?;

let frame_start = match version {
Version::Lite03 | Version::Lite04 => 0,
_ => u64::decode(r, version)?,
};

Ok(Self {
broadcast,
track,
priority,
group,
frame_start,
})
}

Expand All @@ -53,6 +63,56 @@ impl Message for Fetch<'_> {
self.track.encode(w, version)?;
self.priority.encode(w, version)?;
self.group.encode(w, version)?;

match version {
Version::Lite03 | Version::Lite04 => {}
_ => self.frame_start.encode(w, version)?,
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

fn sample() -> Fetch<'static> {
Fetch {
broadcast: Path::new("room/1"),
track: Cow::Borrowed("video"),
priority: 3,
group: 42,
frame_start: 7,
}
}

fn roundtrip(version: Version, fetch: &Fetch<'_>) -> Fetch<'static> {
let mut buf = Vec::new();
fetch.encode_msg(&mut buf, version).unwrap();
let mut r = buf.as_slice();
Fetch::decode_msg(&mut r, version).unwrap()
}

#[test]
fn frame_start_roundtrips_on_lite05() {
let got = roundtrip(Version::Lite05Wip, &sample());
assert_eq!(got.group, 42);
assert_eq!(got.frame_start, 7);
}

#[test]
fn frame_start_absent_before_lite05() {
// Lite03/Lite04 don't carry the frame start varint, so it always decodes as 0.
let got = roundtrip(Version::Lite04, &sample());
assert_eq!(got.group, 42);
assert_eq!(got.frame_start, 0);

// The lite-04 encoding is strictly shorter (no trailing frame start varint).
let mut buf04 = Vec::new();
sample().encode_msg(&mut buf04, Version::Lite04).unwrap();
let mut buf05 = Vec::new();
sample().encode_msg(&mut buf05, Version::Lite05Wip).unwrap();
assert!(buf05.len() > buf04.len(), "lite-05 carries an extra frame start varint");
}
}
Loading