Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions js/moq/src/ietf/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import * as Path from "../path.js";
import { type Reader, Readers, type Stream } from "../stream.ts";
import { unreachable } from "../util/index.ts";
import * as Control from "./control.ts";
import { Fetch, FetchCancel, FetchError, FetchOk } from "./fetch.ts";
import { Fetch, FetchCancel, FetchError, FetchHeader, FetchOk } from "./fetch.ts";
import { GoAway } from "./goaway.ts";
import { Group } from "./object.ts";
import { GroupHeader } from "./group.ts";
import { Publish, PublishError, PublishOk } from "./publish.ts";
import {
PublishNamespace,
Expand Down Expand Up @@ -183,13 +183,13 @@ export class Connection implements Established {
} else if (msg instanceof PublishError) {
throw new Error("PUBLISH_ERROR messages are not supported");
} else if (msg instanceof Fetch) {
throw new Error("FETCH messages are not supported");
await this.#publisher.handleFetch(msg);
} else if (msg instanceof FetchOk) {
throw new Error("FETCH_OK messages are not supported");
this.#subscriber.handleFetchOk(msg);
} else if (msg instanceof FetchError) {
throw new Error("FETCH_ERROR messages are not supported");
this.#subscriber.handleFetchError(msg);
} else if (msg instanceof FetchCancel) {
throw new Error("FETCH_CANCEL messages are not supported");
this.#publisher.handleFetchCancel(msg);
} else if (msg instanceof MaxRequestId) {
this.#control.maxRequestId(msg.requestId);
} else if (msg instanceof RequestsBlocked) {
Expand Down Expand Up @@ -262,10 +262,17 @@ export class Connection implements Established {
*/
async #runObjectStream(stream: Reader) {
try {
// we don't support other stream types yet
const header = await Group.decode(stream);
console.debug("received group header", header);
await this.#subscriber.handleGroup(header, stream);
// TODO support varints for stream type; this is not correct
const typ = (await stream.peek(1))[0];
if (typ === FetchHeader.id) {
const fetch = await FetchHeader.decode(stream);
console.debug("received fetch header", fetch);
await this.#subscriber.handleFetch(fetch, stream);
} else {
const header = await GroupHeader.decode(stream);
console.debug("received group header", header);
await this.#subscriber.handleGroup(header, stream);
}
} catch (err) {
console.error("error processing object stream", err);
}
Expand Down
260 changes: 219 additions & 41 deletions js/moq/src/ietf/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,74 @@
import type * as Path from "../path.ts";
import type { Reader, Writer } from "../stream.ts";
import { GroupOrder } from "./group.ts";
import { Location } from "./location.js";
import * as Message from "./message.ts";
import * as Namespace from "./namespace.ts";
import { Parameters } from "./parameters.ts";

const FETCH_END = 0x03;

export const FetchType = {
Standalone: 0x1,
Relative: 0x2,
Absolute: 0x3,
} as const;

export type FetchType =
| {
type: typeof FetchType.Standalone;
namespace: Path.Valid;
track: string;
start: Location;
end: Location;
}
| {
type: typeof FetchType.Relative;
subscribeId: bigint;
groupOffset: number;
}
| {
type: typeof FetchType.Absolute;
subscribeId: bigint;
groupId: number;
};

export class Fetch {
static id = 0x16;

requestId: bigint;
trackNamespace: Path.Valid;
trackName: string;
subscriberPriority: number;
groupOrder: number;
startGroup: bigint;
startObject: bigint;
endGroup: bigint;
endObject: bigint;
groupOrder: GroupOrder;
fetchType: FetchType;

constructor(
requestId: bigint,
trackNamespace: Path.Valid,
trackName: string,
subscriberPriority: number,
groupOrder: number,
startGroup: bigint,
startObject: bigint,
endGroup: bigint,
endObject: bigint,
) {
constructor(requestId: bigint, subscriberPriority: number, groupOrder: GroupOrder, fetchType: FetchType) {
this.requestId = requestId;
this.trackNamespace = trackNamespace;
this.trackName = trackName;
this.subscriberPriority = subscriberPriority;
this.groupOrder = groupOrder;
this.startGroup = startGroup;
this.startObject = startObject;
this.endGroup = endGroup;
this.endObject = endObject;
this.fetchType = fetchType;
}

async #encode(_w: Writer): Promise<void> {
throw new Error("FETCH messages are not supported");
async #encode(w: Writer): Promise<void> {
await w.u62(this.requestId);
await w.u8(this.subscriberPriority);
await this.groupOrder.encode(w);
await w.u53(this.fetchType.type);
if (this.fetchType.type === FetchType.Standalone) {
await Namespace.encode(w, this.fetchType.namespace);
await w.string(this.fetchType.track);
this.fetchType.start.encode(w);
this.fetchType.end.encode(w);
} else if (this.fetchType.type === FetchType.Relative) {
Comment on lines +57 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Await the Location encodes in Fetch.#encode.

Line 59 and Line 60 call Location.encode(...) without awaiting, so any write failure becomes an unhandled rejection and the message may finish encoding before those coordinates are on the wire. Please await both calls to keep the frame deterministic and error-safe.

-			this.fetchType.start.encode(w);
-			this.fetchType.end.encode(w);
+			await this.fetchType.start.encode(w);
+			await this.fetchType.end.encode(w);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await Namespace.encode(w, this.fetchType.namespace);
await w.string(this.fetchType.track);
this.fetchType.start.encode(w);
this.fetchType.end.encode(w);
} else if (this.fetchType.type === FetchType.Relative) {
await Namespace.encode(w, this.fetchType.namespace);
await w.string(this.fetchType.track);
await this.fetchType.start.encode(w);
await this.fetchType.end.encode(w);
} else if (this.fetchType.type === FetchType.Relative) {
🤖 Prompt for AI Agents
In js/moq/src/ietf/fetch.ts around lines 57 to 61, the calls to
this.fetchType.start.encode(w) and this.fetchType.end.encode(w) are not awaited
which can cause unhandled rejections and non-deterministic framing; update the
method to await both Location.encode calls (i.e., await
this.fetchType.start.encode(w) and await this.fetchType.end.encode(w)) so write
failures propagate correctly and encoding completes deterministically.

await w.u62(this.fetchType.subscribeId);
await w.u53(this.fetchType.groupOffset);
} else if (this.fetchType.type === FetchType.Absolute) {
await w.u62(this.fetchType.subscribeId);
await w.u53(this.fetchType.groupId);
} else {
const fetchType: never = this.fetchType;
throw new Error(`unknown fetch type: ${fetchType}`);
}
await w.u53(0); // no parameters
}

async encode(w: Writer): Promise<void> {
Expand All @@ -49,22 +79,74 @@ export class Fetch {
return Message.decode(r, Fetch.#decode);
}

static async #decode(_r: Reader): Promise<Fetch> {
throw new Error("FETCH messages are not supported");
static async #decode(r: Reader): Promise<Fetch> {
const requestId = await r.u62();
const subscriberPriority = await r.u8();
const groupOrder = await GroupOrder.decode(r);
const fetchType = await r.u53();

if (fetchType === FetchType.Standalone) {
const namespace = await Namespace.decode(r);
const track = await r.string();
const start = await Location.decode(r);
const end = await Location.decode(r);
await Parameters.decode(r); // ignore parameters
return new Fetch(requestId, subscriberPriority, groupOrder, {
type: FetchType.Standalone,
namespace,
track,
start,
end,
});
}

if (fetchType === FetchType.Relative) {
const subscribeId = await r.u62();
const groupOffset = await r.u53();
await Parameters.decode(r); // ignore parameters
return new Fetch(requestId, subscriberPriority, groupOrder, {
type: FetchType.Relative,
subscribeId,
groupOffset,
});
}

if (fetchType === FetchType.Absolute) {
const subscribeId = await r.u62();
const groupId = await r.u53();
await Parameters.decode(r); // ignore parameters
return new Fetch(requestId, subscriberPriority, groupOrder, {
type: FetchType.Absolute,
subscribeId,
groupId,
});
}

throw new Error(`unknown fetch type: ${fetchType}`);
}
}

export class FetchOk {
static id = 0x18;

requestId: bigint;
groupOrder: GroupOrder;
endOfTrack: boolean;
endLocation: Location;

constructor(requestId: bigint) {
constructor(requestId: bigint, groupOrder: GroupOrder, endOfTrack: boolean, endLocation: Location) {
this.requestId = requestId;
this.groupOrder = groupOrder;
this.endOfTrack = endOfTrack;
this.endLocation = endLocation;
}

async #encode(_w: Writer): Promise<void> {
throw new Error("FETCH_OK messages are not supported");
async #encode(w: Writer): Promise<void> {
await w.u62(this.requestId);
await this.groupOrder.encode(w);
await w.bool(this.endOfTrack);
this.endLocation.encode(w);
await w.u53(0); // no parameters
Comment on lines +145 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Await endLocation.encode inside FetchOk.#encode.

Location.encode is async; Line 148 invokes it without await, which drops any rejection on the floor and allows the outer encode to resolve before the location bytes are flushed. Await it so failures propagate and the frame stays well-ordered.

-		this.endLocation.encode(w);
+		await this.endLocation.encode(w);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await w.u62(this.requestId);
await this.groupOrder.encode(w);
await w.bool(this.endOfTrack);
this.endLocation.encode(w);
await w.u53(0); // no parameters
await w.u62(this.requestId);
await this.groupOrder.encode(w);
await w.bool(this.endOfTrack);
await this.endLocation.encode(w);
await w.u53(0); // no parameters
🤖 Prompt for AI Agents
In js/moq/src/ietf/fetch.ts around lines 145 to 149, the call to
this.endLocation.encode(w) is missing an await, which allows its async rejection
to be lost and permits the outer encode to finish before location bytes are
flushed; update the code to await this.endLocation.encode(w) so the method waits
for completion and any errors propagate, keeping frame ordering correct.

}

async encode(w: Writer): Promise<void> {
Expand All @@ -75,8 +157,13 @@ export class FetchOk {
return Message.decode(r, FetchOk.#decode);
}

static async #decode(_r: Reader): Promise<FetchOk> {
throw new Error("FETCH_OK messages are not supported");
static async #decode(r: Reader): Promise<FetchOk> {
const requestId = await r.u62();
const groupOrder = await GroupOrder.decode(r);
const endOfTrack = await r.bool();
const endLocation = await Location.decode(r);
await Parameters.decode(r); // ignore parameters
return new FetchOk(requestId, groupOrder, endOfTrack, endLocation);
}
}

Expand All @@ -93,8 +180,10 @@ export class FetchError {
this.reasonPhrase = reasonPhrase;
}

async #encode(_w: Writer): Promise<void> {
throw new Error("FETCH_ERROR messages are not supported");
async #encode(w: Writer): Promise<void> {
await w.u62(this.requestId);
await w.u53(this.errorCode);
await w.string(this.reasonPhrase);
}

async encode(w: Writer): Promise<void> {
Expand All @@ -105,8 +194,11 @@ export class FetchError {
return Message.decode(r, FetchError.#decode);
}

static async #decode(_r: Reader): Promise<FetchError> {
throw new Error("FETCH_ERROR messages are not supported");
static async #decode(r: Reader): Promise<FetchError> {
const requestId = await r.u62();
const errorCode = await r.u53();
const reasonPhrase = await r.string();
return new FetchError(requestId, errorCode, reasonPhrase);
}
}

Expand All @@ -119,8 +211,8 @@ export class FetchCancel {
this.requestId = requestId;
}

async #encode(_w: Writer): Promise<void> {
throw new Error("FETCH_CANCEL messages are not supported");
async #encode(w: Writer): Promise<void> {
await w.u62(this.requestId);
}

async encode(w: Writer): Promise<void> {
Expand All @@ -131,7 +223,93 @@ export class FetchCancel {
return Message.decode(r, FetchCancel.#decode);
}

static async #decode(_r: Reader): Promise<FetchCancel> {
throw new Error("FETCH_CANCEL messages are not supported");
static async #decode(r: Reader): Promise<FetchCancel> {
const requestId = await r.u62();
return new FetchCancel(requestId);
}
}

export class FetchHeader {
static id = 0x5;

requestId: bigint;

constructor(requestId: bigint) {
this.requestId = requestId;
}

async encode(w: Writer): Promise<void> {
await w.u62(this.requestId);
}

static async decode(r: Reader): Promise<FetchHeader> {
const requestId = await r.u62();
return new FetchHeader(requestId);
}
}

export class FetchObject {
groupId: number;
subgroupId: number;
objectId: number;
publisherPriority: number;
payload?: Uint8Array;

constructor(
groupId: number,
subgroupId: number,
objectId: number,
publisherPriority: number,
payload?: Uint8Array,
) {
this.groupId = groupId;
this.subgroupId = subgroupId;
this.objectId = objectId;
this.publisherPriority = publisherPriority;
this.payload = payload;
}

async encode(w: Writer): Promise<void> {
await w.u53(this.groupId);
await w.u53(this.subgroupId);
await w.u53(this.objectId);
await w.u8(this.publisherPriority);
await w.u53(0); // no extension headers

if (this.payload !== undefined) {
await w.u53(this.payload.byteLength);
if (this.payload.byteLength === 0) {
await w.u53(0); // status = normal
} else {
await w.write(this.payload);
}
} else {
await w.u53(0); // no payload, length = 0
await w.u53(FETCH_END); // no payload, status = end
}
}

static async decode(r: Reader): Promise<FetchObject> {
const groupId = await r.u53();
const subgroupId = await r.u53();
const objectId = await r.u53();
const publisherPriority = await r.u8();
const payloadLength = await r.u53();

let payload: Uint8Array | undefined;
if (payloadLength === 0) {
const status = await r.u53();
if (status === 0) {
payload = new Uint8Array(0);
} else if (status === FETCH_END) {
payload = undefined;
} else {
throw new Error(`unexpected status: ${status}`);
}
} else {
payload = await r.read(payloadLength);
}

return new FetchObject(groupId, subgroupId, objectId, publisherPriority, payload);
}
}
Loading
Loading