From 33da311c05daf0d899e3b5334f3e00cf7955c7eb Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 5 Nov 2025 12:23:38 -0800 Subject: [PATCH 1/2] Start implementing joining fetch. --- js/moq/src/ietf/connection.ts | 4 +- js/moq/src/ietf/fetch.ts | 176 ++++++++++++++++++------ js/moq/src/ietf/frame.ts | 68 +++++++++ js/moq/src/ietf/{object.ts => group.ts} | 76 ++-------- js/moq/src/ietf/index.ts | 2 +- js/moq/src/ietf/location.ts | 22 +++ js/moq/src/ietf/publisher.ts | 5 +- js/moq/src/ietf/subscriber.ts | 13 +- rs/moq-clock/src/main.rs | 2 +- rs/moq/src/error.rs | 2 +- 10 files changed, 251 insertions(+), 119 deletions(-) create mode 100644 js/moq/src/ietf/frame.ts rename js/moq/src/ietf/{object.ts => group.ts} (50%) create mode 100644 js/moq/src/ietf/location.ts diff --git a/js/moq/src/ietf/connection.ts b/js/moq/src/ietf/connection.ts index d2783a936..b1e6eeecd 100644 --- a/js/moq/src/ietf/connection.ts +++ b/js/moq/src/ietf/connection.ts @@ -7,7 +7,7 @@ import { unreachable } from "../util/index.ts"; import * as Control from "./control.ts"; import { Fetch, FetchCancel, FetchError, FetchOk } from "./fetch.ts"; import { GoAway } from "./goaway.ts"; -import { Group } from "./object.ts"; +import { GroupHeader } from "./group.ts"; import { Publish, PublishError, PublishOk } from "./publish.ts"; import { PublishNamespace, @@ -263,7 +263,7 @@ export class Connection implements Established { async #runObjectStream(stream: Reader) { try { // we don't support other stream types yet - const header = await Group.decode(stream); + const header = await GroupHeader.decode(stream); console.debug("received group header", header); await this.#subscriber.handleGroup(header, stream); } catch (err) { diff --git a/js/moq/src/ietf/fetch.ts b/js/moq/src/ietf/fetch.ts index f1c875a90..477ff1e01 100644 --- a/js/moq/src/ietf/fetch.ts +++ b/js/moq/src/ietf/fetch.ts @@ -1,44 +1,71 @@ import type * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; +import { Location } from "./location.js"; import * as Message from "./message.ts"; +import * as Namespace from "./namespace.ts"; +import { Parameters } from "./parameters.ts"; + +export const FetchType = { + Standalone: 0x1, + Relative: 0x2, + Absolute: 0x3, +} as const; + +export type FetchType = + | { + type: typeof FetchType.Standalone; + namespace: Path.Valid; + track: string; + start: Location; + end: Location; + } + | { + type: typeof FetchType.Relative; + subscribeId: bigint; + groupOffset: number; + } + | { + type: typeof FetchType.Absolute; + subscribeId: bigint; + groupId: number; + }; export class Fetch { static id = 0x16; requestId: bigint; - trackNamespace: Path.Valid; - trackName: string; subscriberPriority: number; groupOrder: number; - startGroup: bigint; - startObject: bigint; - endGroup: bigint; - endObject: bigint; - - constructor( - requestId: bigint, - trackNamespace: Path.Valid, - trackName: string, - subscriberPriority: number, - groupOrder: number, - startGroup: bigint, - startObject: bigint, - endGroup: bigint, - endObject: bigint, - ) { + fetchType: FetchType; + + constructor(requestId: bigint, subscriberPriority: number, groupOrder: number, fetchType: FetchType) { this.requestId = requestId; - this.trackNamespace = trackNamespace; - this.trackName = trackName; this.subscriberPriority = subscriberPriority; this.groupOrder = groupOrder; - this.startGroup = startGroup; - this.startObject = startObject; - this.endGroup = endGroup; - this.endObject = endObject; - } - - async #encode(_w: Writer): Promise { - throw new Error("FETCH messages are not supported"); + this.fetchType = fetchType; + } + + async #encode(w: Writer): Promise { + await w.u62(this.requestId); + await w.u8(this.subscriberPriority); + await w.u8(this.groupOrder); + await w.u53(this.fetchType.type); + if (this.fetchType.type === FetchType.Standalone) { + await Namespace.encode(w, this.fetchType.namespace); + await w.string(this.fetchType.track); + this.fetchType.start.encode(w); + this.fetchType.end.encode(w); + } else if (this.fetchType.type === FetchType.Relative) { + await w.u62(this.fetchType.subscribeId); + await w.u53(this.fetchType.groupOffset); + } else if (this.fetchType.type === FetchType.Absolute) { + await w.u62(this.fetchType.subscribeId); + await w.u53(this.fetchType.groupId); + } else { + const fetchType: never = this.fetchType; + throw new Error(`unknown fetch type: ${fetchType}`); + } + await w.u53(0); // no parameters } async encode(w: Writer): Promise { @@ -49,8 +76,50 @@ export class Fetch { return Message.decode(r, Fetch.#decode); } - static async #decode(_r: Reader): Promise { - throw new Error("FETCH messages are not supported"); + static async #decode(r: Reader): Promise { + const requestId = await r.u62(); + const subscriberPriority = await r.u8(); + const groupOrder = await r.u8(); + const fetchType = await r.u53(); + + if (fetchType === FetchType.Standalone) { + const namespace = await Namespace.decode(r); + const track = await r.string(); + const start = await Location.decode(r); + const end = await Location.decode(r); + await Parameters.decode(r); // ignore parameters + return new Fetch(requestId, subscriberPriority, groupOrder, { + type: FetchType.Standalone, + namespace, + track, + start, + end, + }); + } + + if (fetchType === FetchType.Relative) { + const subscribeId = await r.u62(); + const groupOffset = await r.u53(); + await Parameters.decode(r); // ignore parameters + return new Fetch(requestId, subscriberPriority, groupOrder, { + type: FetchType.Relative, + subscribeId, + groupOffset, + }); + } + + if (fetchType === FetchType.Absolute) { + const subscribeId = await r.u62(); + const groupId = await r.u53(); + await Parameters.decode(r); // ignore parameters + return new Fetch(requestId, subscriberPriority, groupOrder, { + type: FetchType.Absolute, + subscribeId, + groupId, + }); + } + + throw new Error(`unknown fetch type: ${fetchType}`); } } @@ -58,13 +127,23 @@ export class FetchOk { static id = 0x18; requestId: bigint; + groupOrder: number; + endOfTrack: boolean; + endLocation: Location; - constructor(requestId: bigint) { + constructor(requestId: bigint, groupOrder: number, endOfTrack: boolean, endLocation: Location) { this.requestId = requestId; + this.groupOrder = groupOrder; + this.endOfTrack = endOfTrack; + this.endLocation = endLocation; } - async #encode(_w: Writer): Promise { - throw new Error("FETCH_OK messages are not supported"); + async #encode(w: Writer): Promise { + await w.u62(this.requestId); + await w.u8(this.groupOrder); + await w.bool(this.endOfTrack); + this.endLocation.encode(w); + await w.u53(0); // no parameters } async encode(w: Writer): Promise { @@ -75,8 +154,13 @@ export class FetchOk { return Message.decode(r, FetchOk.#decode); } - static async #decode(_r: Reader): Promise { - throw new Error("FETCH_OK messages are not supported"); + static async #decode(r: Reader): Promise { + const requestId = await r.u62(); + const groupOrder = await r.u8(); + const endOfTrack = await r.bool(); + const endLocation = await Location.decode(r); + await Parameters.decode(r); // ignore parameters + return new FetchOk(requestId, groupOrder, endOfTrack, endLocation); } } @@ -93,8 +177,10 @@ export class FetchError { this.reasonPhrase = reasonPhrase; } - async #encode(_w: Writer): Promise { - throw new Error("FETCH_ERROR messages are not supported"); + async #encode(w: Writer): Promise { + await w.u62(this.requestId); + await w.u53(this.errorCode); + await w.string(this.reasonPhrase); } async encode(w: Writer): Promise { @@ -105,8 +191,11 @@ export class FetchError { return Message.decode(r, FetchError.#decode); } - static async #decode(_r: Reader): Promise { - throw new Error("FETCH_ERROR messages are not supported"); + static async #decode(r: Reader): Promise { + const requestId = await r.u62(); + const errorCode = await r.u53(); + const reasonPhrase = await r.string(); + return new FetchError(requestId, errorCode, reasonPhrase); } } @@ -119,8 +208,8 @@ export class FetchCancel { this.requestId = requestId; } - async #encode(_w: Writer): Promise { - throw new Error("FETCH_CANCEL messages are not supported"); + async #encode(w: Writer): Promise { + await w.u62(this.requestId); } async encode(w: Writer): Promise { @@ -131,7 +220,8 @@ export class FetchCancel { return Message.decode(r, FetchCancel.#decode); } - static async #decode(_r: Reader): Promise { - throw new Error("FETCH_CANCEL messages are not supported"); + static async #decode(r: Reader): Promise { + const requestId = await r.u62(); + return new FetchCancel(requestId); } } diff --git a/js/moq/src/ietf/frame.ts b/js/moq/src/ietf/frame.ts new file mode 100644 index 000000000..ac2d79be5 --- /dev/null +++ b/js/moq/src/ietf/frame.ts @@ -0,0 +1,68 @@ +import type { Reader, Writer } from "../stream.ts"; +import type { GroupFlags } from "./group.ts"; + +const GROUP_END = 0x03; + +export class Frame { + // undefined means end of group + payload?: Uint8Array; + + constructor(payload?: Uint8Array) { + this.payload = payload; + } + + async encode(w: Writer, flags: GroupFlags): Promise { + await w.u53(0); // id_delta = 0 + + if (flags.hasExtensions) { + await w.u53(0); // extensions length = 0 + } + + if (this.payload !== undefined) { + await w.u53(this.payload.byteLength); + + if (this.payload.byteLength === 0) { + await w.u53(0); // status = normal + } else { + await w.write(this.payload); + } + } else { + await w.u53(0); // length = 0 + await w.u53(GROUP_END); + } + } + + static async decode(r: Reader, flags: GroupFlags): Promise { + console.debug("reading frame delta"); + const delta = await r.u53(); + console.debug("read frame delta", delta); + if (delta !== 0) { + console.warn(`object ID delta is not supported, ignoring: ${delta}`); + } + + if (flags.hasExtensions) { + const extensionsLength = await r.u53(); + // We don't care about extensions + await r.read(extensionsLength); + } + + const payloadLength = await r.u53(); + + if (payloadLength > 0) { + const payload = await r.read(payloadLength); + return new Frame(payload); + } + + const status = await r.u53(); + + if (flags.hasEnd) { + // Empty frame + if (status === 0) return new Frame(new Uint8Array(0)); + } else if (status === 0 || status === GROUP_END) { + // TODO status === 0 should be an empty frame, but moq-rs seems to be sending it incorrectly on group end. + return new Frame(); + } + + throw new Error(`Unsupported object status: ${status}`); + } +} diff --git a/js/moq/src/ietf/object.ts b/js/moq/src/ietf/group.ts similarity index 50% rename from js/moq/src/ietf/object.ts rename to js/moq/src/ietf/group.ts index e37ec0c87..e5b8542ef 100644 --- a/js/moq/src/ietf/object.ts +++ b/js/moq/src/ietf/group.ts @@ -1,6 +1,10 @@ -import type { Reader, Writer } from "../stream.ts"; +import type { Reader, Writer } from "../stream"; -const GROUP_END = 0x03; +export const GroupOrder = { + Any: 0x0, + Ascending: 0x1, + Descending: 0x2, +} as const; export interface GroupFlags { hasExtensions: boolean; @@ -13,7 +17,7 @@ export interface GroupFlags { * STREAM_HEADER_SUBGROUP from moq-transport spec. * Used for stream-per-group delivery mode. */ -export class Group { +export class GroupHeader { flags: GroupFlags; trackAlias: bigint; groupId: number; @@ -55,7 +59,7 @@ export class Group { await w.u8(0); // publisher priority } - static async decode(r: Reader): Promise { + static async decode(r: Reader): Promise { const id = await r.u53(); if (id < 0x10 || id > 0x1f) { throw new Error(`Unsupported group type: ${id}`); @@ -73,68 +77,6 @@ export class Group { const subGroupId = flags.hasSubgroup ? await r.u53() : 0; const publisherPriority = await r.u8(); // Don't care about publisher priority - return new Group(trackAlias, groupId, subGroupId, publisherPriority, flags); - } -} - -export class Frame { - // undefined means end of group - payload?: Uint8Array; - - constructor(payload?: Uint8Array) { - this.payload = payload; - } - - async encode(w: Writer, flags: GroupFlags): Promise { - await w.u53(0); // id_delta = 0 - - if (flags.hasExtensions) { - await w.u53(0); // extensions length = 0 - } - - if (this.payload !== undefined) { - await w.u53(this.payload.byteLength); - - if (this.payload.byteLength === 0) { - await w.u53(0); // status = normal - } else { - await w.write(this.payload); - } - } else { - await w.u53(0); // length = 0 - await w.u53(GROUP_END); - } - } - - static async decode(r: Reader, flags: GroupFlags): Promise { - const delta = await r.u53(); - if (delta !== 0) { - console.warn(`object ID delta is not supported, ignoring: ${delta}`); - } - - if (flags.hasExtensions) { - const extensionsLength = await r.u53(); - // We don't care about extensions - await r.read(extensionsLength); - } - - const payloadLength = await r.u53(); - - if (payloadLength > 0) { - const payload = await r.read(payloadLength); - return new Frame(payload); - } - - const status = await r.u53(); - - if (flags.hasEnd) { - // Empty frame - if (status === 0) return new Frame(new Uint8Array(0)); - } else if (status === 0 || status === GROUP_END) { - // TODO status === 0 should be an empty frame, but moq-rs seems to be sending it incorrectly on group end. - return new Frame(); - } - - throw new Error(`Unsupported object status: ${status}`); + return new GroupHeader(trackAlias, groupId, subGroupId, publisherPriority, flags); } } diff --git a/js/moq/src/ietf/index.ts b/js/moq/src/ietf/index.ts index 92e12818c..dcc89b0e5 100644 --- a/js/moq/src/ietf/index.ts +++ b/js/moq/src/ietf/index.ts @@ -1,7 +1,7 @@ export * from "./connection.ts"; export * from "./control.ts"; +export * from "./frame.ts"; export * from "./goaway.ts"; -export * from "./object.ts"; export * from "./parameters.ts"; export * from "./publish.ts"; export * from "./publish_namespace.ts"; diff --git a/js/moq/src/ietf/location.ts b/js/moq/src/ietf/location.ts new file mode 100644 index 000000000..a4e7631f3 --- /dev/null +++ b/js/moq/src/ietf/location.ts @@ -0,0 +1,22 @@ +import type { Reader, Writer } from "../stream"; + +export class Location { + group: bigint; + object: bigint; + + constructor(group: bigint, object: bigint) { + this.group = group; + this.object = object; + } + + async encode(w: Writer): Promise { + await w.u62(this.group); + await w.u62(this.object); + } + + static async decode(r: Reader): Promise { + const group = await r.u62(); + const object = await r.u62(); + return new Location(group, object); + } +} diff --git a/js/moq/src/ietf/publisher.ts b/js/moq/src/ietf/publisher.ts index 29cf7325a..d4756a881 100644 --- a/js/moq/src/ietf/publisher.ts +++ b/js/moq/src/ietf/publisher.ts @@ -5,7 +5,8 @@ import { Writer } from "../stream.ts"; import type { Track } from "../track.ts"; import { error } from "../util/error.ts"; import type * as Control from "./control.ts"; -import { Frame, Group as GroupMessage } from "./object.ts"; +import { Frame } from "./frame.ts"; +import { GroupHeader } from "./group.ts"; import { PublishNamespace, type PublishNamespaceCancel, @@ -142,7 +143,7 @@ export class Publisher { const stream = await Writer.open(this.#quic); // Write STREAM_HEADER_SUBGROUP - const header = new GroupMessage(requestId, group.sequence, 0, 0, { + const header = new GroupHeader(requestId, group.sequence, 0, 0, { hasExtensions: false, hasSubgroup: false, hasSubgroupObject: false, diff --git a/js/moq/src/ietf/subscriber.ts b/js/moq/src/ietf/subscriber.ts index 63a47499e..a706677e9 100644 --- a/js/moq/src/ietf/subscriber.ts +++ b/js/moq/src/ietf/subscriber.ts @@ -6,7 +6,9 @@ import type { Reader } from "../stream.ts"; import type { Track } from "../track.ts"; import { error } from "../util/error.ts"; import type * as Control from "./control.ts"; -import { Frame, type Group as GroupMessage } from "./object.ts"; +import { Fetch, FetchType } from "./fetch.ts"; +import { Frame } from "./frame.ts"; +import type { GroupHeader } from "./group.ts"; import { type Publish, PublishError } from "./publish.ts"; import type { PublishNamespace, PublishNamespaceDone } from "./publish_namespace.ts"; import { type PublishDone, Subscribe, type SubscribeError, type SubscribeOk, Unsubscribe } from "./subscribe.ts"; @@ -129,6 +131,13 @@ export class Subscriber { await this.#control.write(msg); + // We also need to issue a joining fetch otherwise we miss parts of the first group. + const fetch = new Fetch(requestId, request.priority, 0, { + type: FetchType.Relative, + subscribeId: requestId, + groupOffset: 0, + }); + try { const ok = await responsePromise; this.#trackAliases.set(ok.trackAlias, requestId); @@ -187,7 +196,7 @@ export class Subscriber { * * @internal */ - async handleGroup(group: GroupMessage, stream: Reader) { + async handleGroup(group: GroupHeader, stream: Reader) { const producer = new Group(group.groupId); if (group.subGroupId !== 0) { diff --git a/rs/moq-clock/src/main.rs b/rs/moq-clock/src/main.rs index bd5a1345e..b4f74b567 100644 --- a/rs/moq-clock/src/main.rs +++ b/rs/moq-clock/src/main.rs @@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<()> { let client = config.client.init()?; - tracing::info!(url = ?config.url, "connecting to server"); + tracing::info!(url = %config.url, "connecting to server"); let session = client.connect(config.url).await?; diff --git a/rs/moq/src/error.rs b/rs/moq/src/error.rs index 9dc33c915..720ecc95e 100644 --- a/rs/moq/src/error.rs +++ b/rs/moq/src/error.rs @@ -56,7 +56,7 @@ pub enum Error { #[error("not found")] NotFound, - #[error("wrong frame size")] + #[error("wrong size")] WrongSize, #[error("protocol violation")] From 15562dab2e7febb13f7d5cc6ff074e0c4dc8963b Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 5 Nov 2025 16:40:21 -0800 Subject: [PATCH 2/2] Maybe JS joining fetch. --- js/moq/src/ietf/connection.ts | 25 ++-- js/moq/src/ietf/fetch.ts | 104 +++++++++++++-- js/moq/src/ietf/frame.ts | 68 ---------- js/moq/src/ietf/group.ts | 98 ++++++++++++++- js/moq/src/ietf/index.ts | 1 - js/moq/src/ietf/publish.ts | 9 +- js/moq/src/ietf/publisher.ts | 31 ++++- js/moq/src/ietf/subscriber.ts | 230 ++++++++++++++++++++++++---------- js/moq/src/stream.ts | 5 + 9 files changed, 410 insertions(+), 161 deletions(-) delete mode 100644 js/moq/src/ietf/frame.ts diff --git a/js/moq/src/ietf/connection.ts b/js/moq/src/ietf/connection.ts index b1e6eeecd..58f43b037 100644 --- a/js/moq/src/ietf/connection.ts +++ b/js/moq/src/ietf/connection.ts @@ -5,7 +5,7 @@ import * as Path from "../path.js"; import { type Reader, Readers, type Stream } from "../stream.ts"; import { unreachable } from "../util/index.ts"; import * as Control from "./control.ts"; -import { Fetch, FetchCancel, FetchError, FetchOk } from "./fetch.ts"; +import { Fetch, FetchCancel, FetchError, FetchHeader, FetchOk } from "./fetch.ts"; import { GoAway } from "./goaway.ts"; import { GroupHeader } from "./group.ts"; import { Publish, PublishError, PublishOk } from "./publish.ts"; @@ -183,13 +183,13 @@ export class Connection implements Established { } else if (msg instanceof PublishError) { throw new Error("PUBLISH_ERROR messages are not supported"); } else if (msg instanceof Fetch) { - throw new Error("FETCH messages are not supported"); + await this.#publisher.handleFetch(msg); } else if (msg instanceof FetchOk) { - throw new Error("FETCH_OK messages are not supported"); + this.#subscriber.handleFetchOk(msg); } else if (msg instanceof FetchError) { - throw new Error("FETCH_ERROR messages are not supported"); + this.#subscriber.handleFetchError(msg); } else if (msg instanceof FetchCancel) { - throw new Error("FETCH_CANCEL messages are not supported"); + this.#publisher.handleFetchCancel(msg); } else if (msg instanceof MaxRequestId) { this.#control.maxRequestId(msg.requestId); } else if (msg instanceof RequestsBlocked) { @@ -262,10 +262,17 @@ export class Connection implements Established { */ async #runObjectStream(stream: Reader) { try { - // we don't support other stream types yet - const header = await GroupHeader.decode(stream); - console.debug("received group header", header); - await this.#subscriber.handleGroup(header, stream); + // TODO support varints for stream type; this is not correct + const typ = (await stream.peek(1))[0]; + if (typ === FetchHeader.id) { + const fetch = await FetchHeader.decode(stream); + console.debug("received fetch header", fetch); + await this.#subscriber.handleFetch(fetch, stream); + } else { + const header = await GroupHeader.decode(stream); + console.debug("received group header", header); + await this.#subscriber.handleGroup(header, stream); + } } catch (err) { console.error("error processing object stream", err); } diff --git a/js/moq/src/ietf/fetch.ts b/js/moq/src/ietf/fetch.ts index 477ff1e01..bfa4f3894 100644 --- a/js/moq/src/ietf/fetch.ts +++ b/js/moq/src/ietf/fetch.ts @@ -1,10 +1,13 @@ import type * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; +import { GroupOrder } from "./group.ts"; import { Location } from "./location.js"; import * as Message from "./message.ts"; import * as Namespace from "./namespace.ts"; import { Parameters } from "./parameters.ts"; +const FETCH_END = 0x03; + export const FetchType = { Standalone: 0x1, Relative: 0x2, @@ -35,10 +38,10 @@ export class Fetch { requestId: bigint; subscriberPriority: number; - groupOrder: number; + groupOrder: GroupOrder; fetchType: FetchType; - constructor(requestId: bigint, subscriberPriority: number, groupOrder: number, fetchType: FetchType) { + constructor(requestId: bigint, subscriberPriority: number, groupOrder: GroupOrder, fetchType: FetchType) { this.requestId = requestId; this.subscriberPriority = subscriberPriority; this.groupOrder = groupOrder; @@ -48,7 +51,7 @@ export class Fetch { async #encode(w: Writer): Promise { await w.u62(this.requestId); await w.u8(this.subscriberPriority); - await w.u8(this.groupOrder); + await this.groupOrder.encode(w); await w.u53(this.fetchType.type); if (this.fetchType.type === FetchType.Standalone) { await Namespace.encode(w, this.fetchType.namespace); @@ -79,7 +82,7 @@ export class Fetch { static async #decode(r: Reader): Promise { const requestId = await r.u62(); const subscriberPriority = await r.u8(); - const groupOrder = await r.u8(); + const groupOrder = await GroupOrder.decode(r); const fetchType = await r.u53(); if (fetchType === FetchType.Standalone) { @@ -127,11 +130,11 @@ export class FetchOk { static id = 0x18; requestId: bigint; - groupOrder: number; + groupOrder: GroupOrder; endOfTrack: boolean; endLocation: Location; - constructor(requestId: bigint, groupOrder: number, endOfTrack: boolean, endLocation: Location) { + constructor(requestId: bigint, groupOrder: GroupOrder, endOfTrack: boolean, endLocation: Location) { this.requestId = requestId; this.groupOrder = groupOrder; this.endOfTrack = endOfTrack; @@ -140,7 +143,7 @@ export class FetchOk { async #encode(w: Writer): Promise { await w.u62(this.requestId); - await w.u8(this.groupOrder); + await this.groupOrder.encode(w); await w.bool(this.endOfTrack); this.endLocation.encode(w); await w.u53(0); // no parameters @@ -156,7 +159,7 @@ export class FetchOk { static async #decode(r: Reader): Promise { const requestId = await r.u62(); - const groupOrder = await r.u8(); + const groupOrder = await GroupOrder.decode(r); const endOfTrack = await r.bool(); const endLocation = await Location.decode(r); await Parameters.decode(r); // ignore parameters @@ -225,3 +228,88 @@ export class FetchCancel { return new FetchCancel(requestId); } } + +export class FetchHeader { + static id = 0x5; + + requestId: bigint; + + constructor(requestId: bigint) { + this.requestId = requestId; + } + + async encode(w: Writer): Promise { + await w.u62(this.requestId); + } + + static async decode(r: Reader): Promise { + const requestId = await r.u62(); + return new FetchHeader(requestId); + } +} + +export class FetchObject { + groupId: number; + subgroupId: number; + objectId: number; + publisherPriority: number; + payload?: Uint8Array; + + constructor( + groupId: number, + subgroupId: number, + objectId: number, + publisherPriority: number, + payload?: Uint8Array, + ) { + this.groupId = groupId; + this.subgroupId = subgroupId; + this.objectId = objectId; + this.publisherPriority = publisherPriority; + this.payload = payload; + } + + async encode(w: Writer): Promise { + await w.u53(this.groupId); + await w.u53(this.subgroupId); + await w.u53(this.objectId); + await w.u8(this.publisherPriority); + await w.u53(0); // no extension headers + + if (this.payload !== undefined) { + await w.u53(this.payload.byteLength); + if (this.payload.byteLength === 0) { + await w.u53(0); // status = normal + } else { + await w.write(this.payload); + } + } else { + await w.u53(0); // no payload, length = 0 + await w.u53(FETCH_END); // no payload, status = end + } + } + + static async decode(r: Reader): Promise { + const groupId = await r.u53(); + const subgroupId = await r.u53(); + const objectId = await r.u53(); + const publisherPriority = await r.u8(); + const payloadLength = await r.u53(); + + let payload: Uint8Array | undefined; + if (payloadLength === 0) { + const status = await r.u53(); + if (status === 0) { + payload = new Uint8Array(0); + } else if (status === FETCH_END) { + payload = undefined; + } else { + throw new Error(`unexpected status: ${status}`); + } + } else { + payload = await r.read(payloadLength); + } + + return new FetchObject(groupId, subgroupId, objectId, publisherPriority, payload); + } +} diff --git a/js/moq/src/ietf/frame.ts b/js/moq/src/ietf/frame.ts deleted file mode 100644 index ac2d79be5..000000000 --- a/js/moq/src/ietf/frame.ts +++ /dev/null @@ -1,68 +0,0 @@ -import type { Reader, Writer } from "../stream.ts"; -import type { GroupFlags } from "./group.ts"; - -const GROUP_END = 0x03; - -export class Frame { - // undefined means end of group - payload?: Uint8Array; - - constructor(payload?: Uint8Array) { - this.payload = payload; - } - - async encode(w: Writer, flags: GroupFlags): Promise { - await w.u53(0); // id_delta = 0 - - if (flags.hasExtensions) { - await w.u53(0); // extensions length = 0 - } - - if (this.payload !== undefined) { - await w.u53(this.payload.byteLength); - - if (this.payload.byteLength === 0) { - await w.u53(0); // status = normal - } else { - await w.write(this.payload); - } - } else { - await w.u53(0); // length = 0 - await w.u53(GROUP_END); - } - } - - static async decode(r: Reader, flags: GroupFlags): Promise { - console.debug("reading frame delta"); - const delta = await r.u53(); - console.debug("read frame delta", delta); - if (delta !== 0) { - console.warn(`object ID delta is not supported, ignoring: ${delta}`); - } - - if (flags.hasExtensions) { - const extensionsLength = await r.u53(); - // We don't care about extensions - await r.read(extensionsLength); - } - - const payloadLength = await r.u53(); - - if (payloadLength > 0) { - const payload = await r.read(payloadLength); - return new Frame(payload); - } - - const status = await r.u53(); - - if (flags.hasEnd) { - // Empty frame - if (status === 0) return new Frame(new Uint8Array(0)); - } else if (status === 0 || status === GROUP_END) { - // TODO status === 0 should be an empty frame, but moq-rs seems to be sending it incorrectly on group end. - return new Frame(); - } - - throw new Error(`Unsupported object status: ${status}`); - } -} diff --git a/js/moq/src/ietf/group.ts b/js/moq/src/ietf/group.ts index e5b8542ef..161733498 100644 --- a/js/moq/src/ietf/group.ts +++ b/js/moq/src/ietf/group.ts @@ -1,10 +1,34 @@ import type { Reader, Writer } from "../stream"; -export const GroupOrder = { - Any: 0x0, - Ascending: 0x1, - Descending: 0x2, -} as const; +export class GroupOrder { + #value: number; + + private constructor(value: number) { + this.#value = value; + } + + static readonly Any = new GroupOrder(0x0); + static readonly Ascending = new GroupOrder(0x1); + static readonly Descending = new GroupOrder(0x2); + + async encode(w: Writer): Promise { + await w.u8(this.#value); + } + + static async decode(r: Reader): Promise { + const value = await r.u8(); + switch (value) { + case 0x0: + return GroupOrder.Any; + case 0x1: + return GroupOrder.Ascending; + case 0x2: + return GroupOrder.Descending; + default: + throw new Error(`Invalid GroupOrder: ${value}`); + } + } +} export interface GroupFlags { hasExtensions: boolean; @@ -80,3 +104,67 @@ export class GroupHeader { return new GroupHeader(trackAlias, groupId, subGroupId, publisherPriority, flags); } } + +const GROUP_END = 0x03; + +export class GroupObject { + id_delta: number; + + // undefined means end of group + payload?: Uint8Array; + + constructor(id_delta: number, payload?: Uint8Array) { + this.id_delta = id_delta; + this.payload = payload; + } + + async encode(w: Writer, flags: GroupFlags): Promise { + await w.u53(this.id_delta); + + if (flags.hasExtensions) { + await w.u53(0); // extensions length = 0 + } + + if (this.payload !== undefined) { + await w.u53(this.payload.byteLength); + + if (this.payload.byteLength === 0) { + await w.u53(0); // status = normal + } else { + await w.write(this.payload); + } + } else { + await w.u53(0); // length = 0 + await w.u53(GROUP_END); + } + } + + static async decode(r: Reader, flags: GroupFlags): Promise { + const delta = await r.u53(); + + if (flags.hasExtensions) { + const extensionsLength = await r.u53(); + // We don't care about extensions + await r.read(extensionsLength); + } + + const payloadLength = await r.u53(); + + if (payloadLength > 0) { + const payload = await r.read(payloadLength); + return new GroupObject(delta, payload); + } + + const status = await r.u53(); + + if (status === 0) { + return new GroupObject(delta, new Uint8Array(0)); + } + + if (!flags.hasEnd && status === 3) { + return new GroupObject(delta); + } + + throw new Error(`Unsupported object status: ${status}`); + } +} diff --git a/js/moq/src/ietf/index.ts b/js/moq/src/ietf/index.ts index dcc89b0e5..bc89f709f 100644 --- a/js/moq/src/ietf/index.ts +++ b/js/moq/src/ietf/index.ts @@ -1,6 +1,5 @@ export * from "./connection.ts"; export * from "./control.ts"; -export * from "./frame.ts"; export * from "./goaway.ts"; export * from "./parameters.ts"; export * from "./publish.ts"; diff --git a/js/moq/src/ietf/publish.ts b/js/moq/src/ietf/publish.ts index 0e728560c..cf875a3a0 100644 --- a/js/moq/src/ietf/publish.ts +++ b/js/moq/src/ietf/publish.ts @@ -1,5 +1,6 @@ import type * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; +import { GroupOrder } from "./group.ts"; import * as Message from "./message.ts"; import * as Namespace from "./namespace.ts"; import { Parameters } from "./parameters.ts"; @@ -14,7 +15,7 @@ export class Publish { trackNamespace: Path.Valid; trackName: string; trackAlias: bigint; - groupOrder: number; + groupOrder: GroupOrder; contentExists: boolean; largest: { groupId: bigint; objectId: bigint } | undefined; forward: boolean; @@ -24,7 +25,7 @@ export class Publish { trackNamespace: Path.Valid, trackName: string, trackAlias: bigint, - groupOrder: number, + groupOrder: GroupOrder, contentExists: boolean, largest: { groupId: bigint; objectId: bigint } | undefined, forward: boolean, @@ -44,7 +45,7 @@ export class Publish { await Namespace.encode(w, this.trackNamespace); await w.string(this.trackName); await w.u62(this.trackAlias); - await w.u8(this.groupOrder); + await this.groupOrder.encode(w); await w.bool(this.contentExists); if (this.contentExists !== !!this.largest) { throw new Error("contentExists and largest must both be true or false"); @@ -70,7 +71,7 @@ export class Publish { const trackNamespace = await Namespace.decode(r); const trackName = await r.string(); const trackAlias = await r.u62(); - const groupOrder = await r.u8(); + const groupOrder = await GroupOrder.decode(r); const contentExists = await r.bool(); const largest = contentExists ? { groupId: await r.u62(), objectId: await r.u62() } : undefined; const forward = await r.bool(); diff --git a/js/moq/src/ietf/publisher.ts b/js/moq/src/ietf/publisher.ts index d4756a881..ba0746302 100644 --- a/js/moq/src/ietf/publisher.ts +++ b/js/moq/src/ietf/publisher.ts @@ -5,8 +5,8 @@ import { Writer } from "../stream.ts"; import type { Track } from "../track.ts"; import { error } from "../util/error.ts"; import type * as Control from "./control.ts"; -import { Frame } from "./frame.ts"; -import { GroupHeader } from "./group.ts"; +import { type Fetch, type FetchCancel, FetchError, FetchType } from "./fetch.ts"; +import { GroupHeader, GroupObject } from "./group.ts"; import { PublishNamespace, type PublishNamespaceCancel, @@ -160,7 +160,7 @@ export class Publisher { if (!frame) break; // Write each frame as an object - const obj = new Frame(frame); + const obj = new GroupObject(0, frame); await obj.encode(stream, header.flags); } @@ -218,4 +218,29 @@ export class Publisher { async handleSubscribeNamespace(_msg: SubscribeNamespace) {} async handleUnsubscribeNamespace(_msg: UnsubscribeNamespace) {} + + async handleFetch(msg: Fetch) { + if (msg.fetchType.type === FetchType.Standalone || msg.fetchType.type === FetchType.Absolute) { + const errorMsg = new FetchError(msg.requestId, 400, "fetch type not supported"); + await this.#control.write(errorMsg); + return; + } + + if (msg.fetchType.groupOffset !== 0) { + const errorMsg = new FetchError(msg.requestId, 400, "positive group offset not supported"); + await this.#control.write(errorMsg); + return; + } + + const NO_OBJECTS = 0x6; + + // We don't even check if this was a valid subscribe request. + // Just write NO_OBJECTS because we start at the latest group anyway. + const errMsg = new FetchError(msg.requestId, NO_OBJECTS, "no objects"); + await this.#control.write(errMsg); + } + + handleFetchCancel(_msg: FetchCancel) { + // lul who cares + } } diff --git a/js/moq/src/ietf/subscriber.ts b/js/moq/src/ietf/subscriber.ts index a706677e9..accfdc8df 100644 --- a/js/moq/src/ietf/subscriber.ts +++ b/js/moq/src/ietf/subscriber.ts @@ -6,9 +6,16 @@ import type { Reader } from "../stream.ts"; import type { Track } from "../track.ts"; import { error } from "../util/error.ts"; import type * as Control from "./control.ts"; -import { Fetch, FetchType } from "./fetch.ts"; -import { Frame } from "./frame.ts"; -import type { GroupHeader } from "./group.ts"; +import { + Fetch, + FetchCancel, + type FetchError, + type FetchHeader, + FetchObject, + type FetchOk, + FetchType, +} from "./fetch.ts"; +import { type GroupHeader, GroupObject, GroupOrder } from "./group.ts"; import { type Publish, PublishError } from "./publish.ts"; import type { PublishNamespace, PublishNamespaceDone } from "./publish_namespace.ts"; import { type PublishDone, Subscribe, type SubscribeError, type SubscribeOk, Unsubscribe } from "./subscribe.ts"; @@ -20,6 +27,19 @@ import { } from "./subscribe_namespace.ts"; import type { TrackStatus } from "./track.ts"; +interface FetchState { + track: Track; + resolve: (group?: Group) => void; + reject: (error: Error) => void; +} + +interface SubscribeState { + track: Track; + fetch: Promise; + resolve: (ok: SubscribeOk) => void; + reject: (error: Error) => void; +} + /** * Handles subscribing to broadcasts using moq-transport protocol with lite-compatibility restrictions. * @@ -35,20 +55,14 @@ export class Subscriber { #announcedConsumers = new Set(); // Our subscribed tracks - keyed by request ID - #subscribes = new Map(); + #subscribes = new Map(); + + // Active fetches - keyed by request ID + #fetches = new Map(); // A map of track aliases to request IDs #trackAliases = new Map(); - // Track subscription responses - keyed by request ID - #subscribeCallbacks = new Map< - bigint, - { - resolve: (msg: SubscribeOk) => void; - reject: (msg: Error) => void; - } - >(); - /** * Creates a new Subscriber instance. * @param quic - The WebTransport session to use @@ -118,33 +132,44 @@ export class Subscriber { async #runSubscribe(broadcast: Path.Valid, request: TrackRequest) { const requestId = await this.#control.nextRequestId(); - if (requestId === undefined) return; + const fetchRequestId = await this.#control.nextRequestId(); + if (requestId === undefined || fetchRequestId === undefined) return; - this.#subscribes.set(requestId, request.track); - - const msg = new Subscribe(requestId, broadcast, request.track.name, request.priority); + try { + // Unblock when the joining fetch is complete. + const fetchPromise = new Promise((resolve, reject) => { + this.#fetches.set(fetchRequestId, { track: request.track, resolve, reject }); + }); - // Send SUBSCRIBE message on control stream and wait for response - const responsePromise = new Promise((resolve, reject) => { - this.#subscribeCallbacks.set(requestId, { resolve, reject }); - }); + // Send SUBSCRIBE message on control stream and wait for response + const subscribePromise = new Promise((resolve, reject) => { + this.#subscribes.set(requestId, { track: request.track, fetch: fetchPromise, resolve, reject }); + }); - await this.#control.write(msg); + const msg = new Subscribe(requestId, broadcast, request.track.name, request.priority); + await this.#control.write(msg); - // We also need to issue a joining fetch otherwise we miss parts of the first group. - const fetch = new Fetch(requestId, request.priority, 0, { - type: FetchType.Relative, - subscribeId: requestId, - groupOffset: 0, - }); + // We also need to issue a joining fetch otherwise we will miss parts of the first group. + // THIS IS EXTREMELY ANNOYING. + const fetch = new Fetch(fetchRequestId, request.priority, GroupOrder.Descending, { + type: FetchType.Relative, + subscribeId: requestId, + groupOffset: 0, + }); + await this.#control.write(fetch); - try { - const ok = await responsePromise; - this.#trackAliases.set(ok.trackAlias, requestId); + // Wait for the SUBSCRIBE_OK so we know the track alias. + const ok = await subscribePromise; try { + this.#trackAliases.set(ok.trackAlias, requestId); + await request.track.closed; + // TODO only send this if needed. + const fetchCancel = new FetchCancel(fetchRequestId); + await this.#control.write(fetchCancel); + const msg = new Unsubscribe(requestId); await this.#control.write(msg); } finally { @@ -155,7 +180,7 @@ export class Subscriber { request.track.close(e); } finally { this.#subscribes.delete(requestId); - this.#subscribeCallbacks.delete(requestId); + this.#fetches.delete(requestId); } } @@ -166,12 +191,13 @@ export class Subscriber { * @internal */ async handleSubscribeOk(msg: SubscribeOk) { - const callback = this.#subscribeCallbacks.get(msg.requestId); - if (callback) { - callback.resolve(msg); - } else { + const subscribe = this.#subscribes.get(msg.requestId); + if (!subscribe) { console.warn("handleSubscribeOk unknown requestId", msg.requestId); + return; } + + subscribe.resolve(msg); } /** @@ -181,12 +207,13 @@ export class Subscriber { * @internal */ async handleSubscribeError(msg: SubscribeError) { - const callback = this.#subscribeCallbacks.get(msg.requestId); - if (callback) { - callback.reject(new Error(`SUBSCRIBE_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`)); - } else { + const subscribe = this.#subscribes.get(msg.requestId); + if (!subscribe) { console.warn("handleSubscribeError unknown requestId", msg.requestId); + return; } + + subscribe.reject(new Error(`SUBSCRIBE_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`)); } /** @@ -197,36 +224,42 @@ export class Subscriber { * @internal */ async handleGroup(group: GroupHeader, stream: Reader) { - const producer = new Group(group.groupId); - if (group.subGroupId !== 0) { - console.warn("subgroup ID is not supported, ignoring"); + throw new Error(`subgroup ID is not supported: ${group.subGroupId}`); } - try { - let requestId = this.#trackAliases.get(group.trackAlias); - if (requestId === undefined) { - // Just hope the track alias is the request ID - requestId = group.trackAlias; - console.warn("unknown track alias, using request ID"); - } + let requestId = this.#trackAliases.get(group.trackAlias); + if (requestId === undefined) { + // Just hope the track alias is the request ID + requestId = group.trackAlias; + console.warn("unknown track alias, using request ID"); + } - const track = this.#subscribes.get(requestId); - if (!track) { - throw new Error( - `unknown track: trackAlias=${group.trackAlias} requestId=${this.#trackAliases.get(group.trackAlias)}`, - ); - } + const subscribe = this.#subscribes.get(requestId); + if (!subscribe) { + throw new Error( + `unknown subscribe: trackAlias=${group.trackAlias} requestId=${this.#trackAliases.get(group.trackAlias)}`, + ); + } + let producer: Group; - // Convert to Group (moq-lite equivalent) - track.writeGroup(producer); + // Ugh we have to make sure the joining fetch is complete. + const first = await subscribe.fetch; + if (first && first.sequence === group.groupId) { + // Continue where the joining fetch left off. + producer = first; + } else { + producer = new Group(group.groupId); + subscribe.track.writeGroup(producer); + } + try { // Read objects from the stream until end of group for (;;) { - const done = await Promise.race([stream.done(), producer.closed, track.closed]); + const done = await Promise.race([stream.done(), producer.closed, subscribe.track.closed]); if (done !== false) break; - const frame = await Frame.decode(stream, group.flags); + const frame = await GroupObject.decode(stream, group.flags); if (frame.payload === undefined) break; // Treat each object payload as a frame @@ -255,10 +288,13 @@ export class Subscriber { */ async handlePublishDone(msg: PublishDone) { // For lite compatibility, we treat this as subscription completion - const callback = this.#subscribeCallbacks.get(msg.requestId); - if (callback) { - callback.reject(new Error(`PUBLISH_DONE: code=${msg.statusCode} reason=${msg.reasonPhrase}`)); + const subscribe = this.#subscribes.get(msg.requestId); + if (!subscribe) { + console.warn("handlePublishDone unknown requestId", msg.requestId); + return; } + + subscribe.track.close(); } /** @@ -316,4 +352,72 @@ export class Subscriber { async handleTrackStatus(_msg: TrackStatus) { throw new Error("TRACK_STATUS messages are not supported"); } + + async handleFetch(header: FetchHeader, stream: Reader) { + const fetch = this.#fetches.get(header.requestId); + if (!fetch) { + throw new Error(`unknown fetch: requestId=${header.requestId}`); + } + + this.#fetches.delete(header.requestId); + const { track, resolve, reject } = fetch; + + try { + let group: Group | undefined; + let nextObjectId = 0; + + for (;;) { + const done = await Promise.race([stream.done(), track.closed]); + if (done !== false) break; + + const frame = await FetchObject.decode(stream); + if (frame.payload === undefined) break; + + if (group === undefined) { + group = new Group(frame.groupId); + track.writeGroup(group); + } else if (group.sequence !== frame.groupId) { + throw new Error(`fetch returned multiple groups: ${group.sequence} !== ${frame.groupId}`); + } + + if (frame.objectId !== nextObjectId) { + throw new Error(`fetch returned object ID out of order: ${frame.objectId} !== ${nextObjectId}`); + } + + if (frame.subgroupId !== 0) { + throw new Error(`fetch returned subgroup ID: ${frame.subgroupId}`); + } + + nextObjectId++; + + track.writeFrame(frame.payload); + } + + // Send the remainder of the group to the callback. + resolve(group); + } catch (err: unknown) { + const e = error(err); + reject(e); + } + } + + handleFetchOk(msg: FetchOk) { + const fetch = this.#fetches.get(msg.requestId); + if (!fetch) { + throw new Error(`unknown fetch: requestId=${msg.requestId}`); + } + + if (msg.endOfTrack) { + console.warn("TODO handle end of track"); + } + } + + handleFetchError(msg: FetchError) { + const fetch = this.#fetches.get(msg.requestId); + if (!fetch) { + throw new Error(`unknown fetch: requestId=${msg.requestId}`); + } + + fetch.reject(new Error(`FETCH_ERROR: code=${msg.errorCode} reason=${msg.reasonPhrase}`)); + } } diff --git a/js/moq/src/stream.ts b/js/moq/src/stream.ts index 9e5d053e5..299ceaebf 100644 --- a/js/moq/src/stream.ts +++ b/js/moq/src/stream.ts @@ -132,6 +132,11 @@ export class Reader { return this.#slice(this.#buffer.byteLength); } + async peek(size: number): Promise { + await this.#fillTo(size); + return new Uint8Array(this.#buffer.buffer, this.#buffer.byteOffset, size); + } + async string(): Promise { const length = await this.u53(); const buffer = await this.read(length);