Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions engine/sdks/typescript/runner/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import type { Logger } from "pino";
import type WebSocket from "ws";
import { logger, setLogger } from "./log.js";
import { stringifyCommandWrapper, stringifyEvent } from "./stringify";
import { Tunnel } from "./tunnel";
import {
calculateBackoff,
Expand Down Expand Up @@ -104,7 +105,7 @@
}

#actors: Map<string, ActorInstance> = new Map();
#actorWebSockets: Map<string, Set<WebSocketTunnelAdapter>> = new Map();

Check warning on line 108 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.

Check warning on line 108 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.

// WebSocket
#pegboardWebSocket?: WebSocket;
Expand Down Expand Up @@ -754,7 +755,7 @@
for (const commandWrapper of commands) {
this.log?.info({
msg: "received command",
commandWrapper,
command: stringifyCommandWrapper(commandWrapper),
});
if (commandWrapper.inner.tag === "CommandStartActor") {
this.#handleCommandStartActor(commandWrapper);
Expand Down Expand Up @@ -905,9 +906,8 @@

this.log?.info({
msg: "sending event to server",
index: eventWrapper.index,
tag: eventWrapper.inner.tag,
val: eventWrapper.inner.val,
event: stringifyEvent(eventWrapper.inner),
index: eventWrapper.index.toString(),
});

this.__sendToServer({
Expand Down Expand Up @@ -962,9 +962,8 @@

this.log?.info({
msg: "sending event to server",
index: eventWrapper.index,
tag: eventWrapper.inner.tag,
val: eventWrapper.inner.val,
event: stringifyEvent(eventWrapper.inner),
index: eventWrapper.index.toString(),
});

this.__sendToServer({
Expand Down
184 changes: 184 additions & 0 deletions engine/sdks/typescript/runner/src/stringify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import type * as protocol from "@rivetkit/engine-runner-protocol";

/**
* Helper function to stringify ArrayBuffer for logging
*/
function stringifyArrayBuffer(buffer: ArrayBuffer): string {
return `ArrayBuffer(${buffer.byteLength})`;
}

/**
* Helper function to stringify bigint for logging
*/
function stringifyBigInt(value: bigint): string {
return `${value}n`;
}

/**
* Helper function to stringify Map for logging
*/
function stringifyMap(map: ReadonlyMap<string, string>): string {
const entries = Array.from(map.entries())
.map(([k, v]) => `"${k}": "${v}"`)
.join(", ");
return `Map(${map.size}){${entries}}`;
}

/**
* Stringify ToServerTunnelMessageKind for logging
* Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified
*/
export function stringifyToServerTunnelMessageKind(
kind: protocol.ToServerTunnelMessageKind,
): string {
switch (kind.tag) {
case "TunnelAck":
return "TunnelAck";
case "ToServerResponseStart": {
const { status, headers, body, stream } = kind.val;
const bodyStr = body === null ? "null" : stringifyArrayBuffer(body);
return `ToServerResponseStart{status: ${status}, headers: ${stringifyMap(headers)}, body: ${bodyStr}, stream: ${stream}}`;
}
case "ToServerResponseChunk": {
const { body, finish } = kind.val;
return `ToServerResponseChunk{body: ${stringifyArrayBuffer(body)}, finish: ${finish}}`;
}
case "ToServerResponseAbort":
return "ToServerResponseAbort";
case "ToServerWebSocketOpen": {
const { canHibernate, lastMsgIndex } = kind.val;
return `ToServerWebSocketOpen{canHibernate: ${canHibernate}, lastMsgIndex: ${stringifyBigInt(lastMsgIndex)}}`;
}
case "ToServerWebSocketMessage": {
const { data, binary } = kind.val;
return `ToServerWebSocketMessage{data: ${stringifyArrayBuffer(data)}, binary: ${binary}}`;
}
case "ToServerWebSocketMessageAck": {
const { index } = kind.val;
return `ToServerWebSocketMessageAck{index: ${index}}`;
}
case "ToServerWebSocketClose": {
const { code, reason, retry } = kind.val;
const codeStr = code === null ? "null" : code.toString();
const reasonStr = reason === null ? "null" : `"${reason}"`;
return `ToServerWebSocketClose{code: ${codeStr}, reason: ${reasonStr}, retry: ${retry}}`;
}
}
}

/**
* Stringify ToClientTunnelMessageKind for logging
* Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified
*/
export function stringifyToClientTunnelMessageKind(
kind: protocol.ToClientTunnelMessageKind,
): string {
switch (kind.tag) {
case "TunnelAck":
return "TunnelAck";
case "ToClientRequestStart": {
const { actorId, method, path, headers, body, stream } = kind.val;
const bodyStr = body === null ? "null" : stringifyArrayBuffer(body);
return `ToClientRequestStart{actorId: "${actorId}", method: "${method}", path: "${path}", headers: ${stringifyMap(headers)}, body: ${bodyStr}, stream: ${stream}}`;
}
case "ToClientRequestChunk": {
const { body, finish } = kind.val;
return `ToClientRequestChunk{body: ${stringifyArrayBuffer(body)}, finish: ${finish}}`;
}
case "ToClientRequestAbort":
return "ToClientRequestAbort";
case "ToClientWebSocketOpen": {
const { actorId, path, headers } = kind.val;
return `ToClientWebSocketOpen{actorId: "${actorId}", path: "${path}", headers: ${stringifyMap(headers)}}`;
}
case "ToClientWebSocketMessage": {
const { index, data, binary } = kind.val;
return `ToClientWebSocketMessage{index: ${index}, data: ${stringifyArrayBuffer(data)}, binary: ${binary}}`;
}
case "ToClientWebSocketClose": {
const { code, reason } = kind.val;
const codeStr = code === null ? "null" : code.toString();
const reasonStr = reason === null ? "null" : `"${reason}"`;
return `ToClientWebSocketClose{code: ${codeStr}, reason: ${reasonStr}}`;
}
}
}

/**
* Stringify Command for logging
* Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified
*/
export function stringifyCommand(command: protocol.Command): string {
switch (command.tag) {
case "CommandStartActor": {
const { actorId, generation, config } = command.val;
const keyStr = config.key === null ? "null" : `"${config.key}"`;
const inputStr =
config.input === null
? "null"
: stringifyArrayBuffer(config.input);
return `CommandStartActor{actorId: "${actorId}", generation: ${generation}, config: {name: "${config.name}", key: ${keyStr}, createTs: ${stringifyBigInt(config.createTs)}, input: ${inputStr}}}`;
}
case "CommandStopActor": {
const { actorId, generation } = command.val;
return `CommandStopActor{actorId: "${actorId}", generation: ${generation}}`;
}
}
}

/**
* Stringify CommandWrapper for logging
* Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified
*/
export function stringifyCommandWrapper(
wrapper: protocol.CommandWrapper,
): string {
return `CommandWrapper{index: ${stringifyBigInt(wrapper.index)}, inner: ${stringifyCommand(wrapper.inner)}}`;
}

/**
* Stringify Event for logging
* Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified
*/
export function stringifyEvent(event: protocol.Event): string {
switch (event.tag) {
case "EventActorIntent": {
const { actorId, generation, intent } = event.val;
const intentStr =
intent.tag === "ActorIntentSleep"
? "Sleep"
: intent.tag === "ActorIntentStop"
? "Stop"
: "Unknown";
return `EventActorIntent{actorId: "${actorId}", generation: ${generation}, intent: ${intentStr}}`;
}
case "EventActorStateUpdate": {
const { actorId, generation, state } = event.val;
let stateStr: string;
if (state.tag === "ActorStateRunning") {
stateStr = "Running";
} else if (state.tag === "ActorStateStopped") {
const { code, message } = state.val;
const messageStr = message === null ? "null" : `"${message}"`;
stateStr = `Stopped{code: ${code}, message: ${messageStr}}`;
} else {
stateStr = "Unknown";
}
return `EventActorStateUpdate{actorId: "${actorId}", generation: ${generation}, state: ${stateStr}}`;
}
case "EventActorSetAlarm": {
const { actorId, generation, alarmTs } = event.val;
const alarmTsStr =
alarmTs === null ? "null" : stringifyBigInt(alarmTs);
return `EventActorSetAlarm{actorId: "${actorId}", generation: ${generation}, alarmTs: ${alarmTsStr}}`;
}
}
}

/**
* Stringify EventWrapper for logging
* Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified
*/
export function stringifyEventWrapper(wrapper: protocol.EventWrapper): string {
return `EventWrapper{index: ${stringifyBigInt(wrapper.index)}, inner: ${stringifyEvent(wrapper.inner)}}`;
}
16 changes: 11 additions & 5 deletions engine/sdks/typescript/runner/src/tunnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
import type { MessageId, RequestId } from "@rivetkit/engine-runner-protocol";
import type { Logger } from "pino";
import { stringify as uuidstringify, v4 as uuidv4 } from "uuid";
import { logger } from "./log";

Check warning on line 5 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedImports

This import is unused.

Check warning on line 5 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedImports

This import is unused.
import type { ActorInstance, Runner } from "./mod";
import {
stringifyToClientTunnelMessageKind,
stringifyToServerTunnelMessageKind,
} from "./stringify";
import { unreachable } from "./utils";
import { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter";

const GC_INTERVAL = 60000; // 60 seconds
const MESSAGE_ACK_TIMEOUT = 5000; // 5 seconds
const WEBSOCKET_STATE_PERSIST_TIMEOUT = 30000; // 30 seconds

Check warning on line 16 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedVariables

This variable WEBSOCKET_STATE_PERSIST_TIMEOUT is unused.

Check warning on line 16 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedVariables

This variable WEBSOCKET_STATE_PERSIST_TIMEOUT is unused.

interface PendingRequest {
resolve: (response: Response) => void;
Expand Down Expand Up @@ -90,9 +94,11 @@
) {
// TODO: Switch this with runner WS
if (!this.#runner.__webSocketReady()) {
this.log?.warn(
"cannot send tunnel message, socket not connected to engine",
);
this.log?.warn({
msg: "cannot send tunnel message, socket not connected to engine",
requestId: idToStr(requestId),
message: stringifyToServerTunnelMessageKind(messageKind),
});
return;
}

Expand All @@ -110,7 +116,7 @@
msg: "send tunnel msg",
requestId: requestIdStr,
messageId: messageIdStr,
message: messageKind,
message: stringifyToServerTunnelMessageKind(messageKind),
});

// Send message
Expand Down Expand Up @@ -283,7 +289,7 @@
msg: "receive tunnel msg",
requestId: requestIdStr,
messageId: messageIdStr,
message: message.messageKind,
message: stringifyToClientTunnelMessageKind(message.messageKind),
});

if (message.messageKind.tag === "TunnelAck") {
Expand Down Expand Up @@ -629,8 +635,8 @@
headerInit[k] = v;
}
}
headerInit["Upgrade"] = "websocket";

Check notice on line 638 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/complexity/useLiteralKeys

The computed expression can be simplified without the use of a string literal.

Check notice on line 638 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/complexity/useLiteralKeys

The computed expression can be simplified without the use of a string literal.
headerInit["Connection"] = "Upgrade";

Check notice on line 639 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/complexity/useLiteralKeys

The computed expression can be simplified without the use of a string literal.

Check notice on line 639 in engine/sdks/typescript/runner/src/tunnel.ts

View workflow job for this annotation

GitHub Actions / quality

lint/complexity/useLiteralKeys

The computed expression can be simplified without the use of a string literal.

const request = new Request(`http://localhost${open.path}`, {
method: "GET",
Expand Down
Loading