diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index c80a201649..0c1b33137d 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -2,6 +2,7 @@ import * as protocol from "@rivetkit/engine-runner-protocol"; import type { Logger } from "pino"; import type WebSocket from "ws"; import { logger, setLogger } from "./log.js"; +import { stringifyCommandWrapper, stringifyEvent } from "./stringify"; import { Tunnel } from "./tunnel"; import { calculateBackoff, @@ -754,7 +755,7 @@ export class Runner { for (const commandWrapper of commands) { this.log?.info({ msg: "received command", - commandWrapper, + command: stringifyCommandWrapper(commandWrapper), }); if (commandWrapper.inner.tag === "CommandStartActor") { this.#handleCommandStartActor(commandWrapper); @@ -905,9 +906,8 @@ export class Runner { this.log?.info({ msg: "sending event to server", - index: eventWrapper.index, - tag: eventWrapper.inner.tag, - val: eventWrapper.inner.val, + event: stringifyEvent(eventWrapper.inner), + index: eventWrapper.index.toString(), }); this.__sendToServer({ @@ -962,9 +962,8 @@ export class Runner { this.log?.info({ msg: "sending event to server", - index: eventWrapper.index, - tag: eventWrapper.inner.tag, - val: eventWrapper.inner.val, + event: stringifyEvent(eventWrapper.inner), + index: eventWrapper.index.toString(), }); this.__sendToServer({ diff --git a/engine/sdks/typescript/runner/src/stringify.ts b/engine/sdks/typescript/runner/src/stringify.ts new file mode 100644 index 0000000000..699b2745c1 --- /dev/null +++ b/engine/sdks/typescript/runner/src/stringify.ts @@ -0,0 +1,184 @@ +import type * as protocol from "@rivetkit/engine-runner-protocol"; + +/** + * Helper function to stringify ArrayBuffer for logging + */ +function stringifyArrayBuffer(buffer: ArrayBuffer): string { + return `ArrayBuffer(${buffer.byteLength})`; +} + +/** + * Helper function to stringify bigint for logging + */ +function stringifyBigInt(value: bigint): string { + return `${value}n`; +} + +/** + * Helper function to stringify Map for logging + */ +function stringifyMap(map: ReadonlyMap): string { + const entries = Array.from(map.entries()) + .map(([k, v]) => `"${k}": "${v}"`) + .join(", "); + return `Map(${map.size}){${entries}}`; +} + +/** + * Stringify ToServerTunnelMessageKind for logging + * Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified + */ +export function stringifyToServerTunnelMessageKind( + kind: protocol.ToServerTunnelMessageKind, +): string { + switch (kind.tag) { + case "TunnelAck": + return "TunnelAck"; + case "ToServerResponseStart": { + const { status, headers, body, stream } = kind.val; + const bodyStr = body === null ? "null" : stringifyArrayBuffer(body); + return `ToServerResponseStart{status: ${status}, headers: ${stringifyMap(headers)}, body: ${bodyStr}, stream: ${stream}}`; + } + case "ToServerResponseChunk": { + const { body, finish } = kind.val; + return `ToServerResponseChunk{body: ${stringifyArrayBuffer(body)}, finish: ${finish}}`; + } + case "ToServerResponseAbort": + return "ToServerResponseAbort"; + case "ToServerWebSocketOpen": { + const { canHibernate, lastMsgIndex } = kind.val; + return `ToServerWebSocketOpen{canHibernate: ${canHibernate}, lastMsgIndex: ${stringifyBigInt(lastMsgIndex)}}`; + } + case "ToServerWebSocketMessage": { + const { data, binary } = kind.val; + return `ToServerWebSocketMessage{data: ${stringifyArrayBuffer(data)}, binary: ${binary}}`; + } + case "ToServerWebSocketMessageAck": { + const { index } = kind.val; + return `ToServerWebSocketMessageAck{index: ${index}}`; + } + case "ToServerWebSocketClose": { + const { code, reason, retry } = kind.val; + const codeStr = code === null ? "null" : code.toString(); + const reasonStr = reason === null ? "null" : `"${reason}"`; + return `ToServerWebSocketClose{code: ${codeStr}, reason: ${reasonStr}, retry: ${retry}}`; + } + } +} + +/** + * Stringify ToClientTunnelMessageKind for logging + * Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified + */ +export function stringifyToClientTunnelMessageKind( + kind: protocol.ToClientTunnelMessageKind, +): string { + switch (kind.tag) { + case "TunnelAck": + return "TunnelAck"; + case "ToClientRequestStart": { + const { actorId, method, path, headers, body, stream } = kind.val; + const bodyStr = body === null ? "null" : stringifyArrayBuffer(body); + return `ToClientRequestStart{actorId: "${actorId}", method: "${method}", path: "${path}", headers: ${stringifyMap(headers)}, body: ${bodyStr}, stream: ${stream}}`; + } + case "ToClientRequestChunk": { + const { body, finish } = kind.val; + return `ToClientRequestChunk{body: ${stringifyArrayBuffer(body)}, finish: ${finish}}`; + } + case "ToClientRequestAbort": + return "ToClientRequestAbort"; + case "ToClientWebSocketOpen": { + const { actorId, path, headers } = kind.val; + return `ToClientWebSocketOpen{actorId: "${actorId}", path: "${path}", headers: ${stringifyMap(headers)}}`; + } + case "ToClientWebSocketMessage": { + const { index, data, binary } = kind.val; + return `ToClientWebSocketMessage{index: ${index}, data: ${stringifyArrayBuffer(data)}, binary: ${binary}}`; + } + case "ToClientWebSocketClose": { + const { code, reason } = kind.val; + const codeStr = code === null ? "null" : code.toString(); + const reasonStr = reason === null ? "null" : `"${reason}"`; + return `ToClientWebSocketClose{code: ${codeStr}, reason: ${reasonStr}}`; + } + } +} + +/** + * Stringify Command for logging + * Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified + */ +export function stringifyCommand(command: protocol.Command): string { + switch (command.tag) { + case "CommandStartActor": { + const { actorId, generation, config } = command.val; + const keyStr = config.key === null ? "null" : `"${config.key}"`; + const inputStr = + config.input === null + ? "null" + : stringifyArrayBuffer(config.input); + return `CommandStartActor{actorId: "${actorId}", generation: ${generation}, config: {name: "${config.name}", key: ${keyStr}, createTs: ${stringifyBigInt(config.createTs)}, input: ${inputStr}}}`; + } + case "CommandStopActor": { + const { actorId, generation } = command.val; + return `CommandStopActor{actorId: "${actorId}", generation: ${generation}}`; + } + } +} + +/** + * Stringify CommandWrapper for logging + * Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified + */ +export function stringifyCommandWrapper( + wrapper: protocol.CommandWrapper, +): string { + return `CommandWrapper{index: ${stringifyBigInt(wrapper.index)}, inner: ${stringifyCommand(wrapper.inner)}}`; +} + +/** + * Stringify Event for logging + * Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified + */ +export function stringifyEvent(event: protocol.Event): string { + switch (event.tag) { + case "EventActorIntent": { + const { actorId, generation, intent } = event.val; + const intentStr = + intent.tag === "ActorIntentSleep" + ? "Sleep" + : intent.tag === "ActorIntentStop" + ? "Stop" + : "Unknown"; + return `EventActorIntent{actorId: "${actorId}", generation: ${generation}, intent: ${intentStr}}`; + } + case "EventActorStateUpdate": { + const { actorId, generation, state } = event.val; + let stateStr: string; + if (state.tag === "ActorStateRunning") { + stateStr = "Running"; + } else if (state.tag === "ActorStateStopped") { + const { code, message } = state.val; + const messageStr = message === null ? "null" : `"${message}"`; + stateStr = `Stopped{code: ${code}, message: ${messageStr}}`; + } else { + stateStr = "Unknown"; + } + return `EventActorStateUpdate{actorId: "${actorId}", generation: ${generation}, state: ${stateStr}}`; + } + case "EventActorSetAlarm": { + const { actorId, generation, alarmTs } = event.val; + const alarmTsStr = + alarmTs === null ? "null" : stringifyBigInt(alarmTs); + return `EventActorSetAlarm{actorId: "${actorId}", generation: ${generation}, alarmTs: ${alarmTsStr}}`; + } + } +} + +/** + * Stringify EventWrapper for logging + * Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified + */ +export function stringifyEventWrapper(wrapper: protocol.EventWrapper): string { + return `EventWrapper{index: ${stringifyBigInt(wrapper.index)}, inner: ${stringifyEvent(wrapper.inner)}}`; +} diff --git a/engine/sdks/typescript/runner/src/tunnel.ts b/engine/sdks/typescript/runner/src/tunnel.ts index 5c62b6b5dd..5ca1326f57 100644 --- a/engine/sdks/typescript/runner/src/tunnel.ts +++ b/engine/sdks/typescript/runner/src/tunnel.ts @@ -4,6 +4,10 @@ import type { Logger } from "pino"; import { stringify as uuidstringify, v4 as uuidv4 } from "uuid"; import { logger } from "./log"; import type { ActorInstance, Runner } from "./mod"; +import { + stringifyToClientTunnelMessageKind, + stringifyToServerTunnelMessageKind, +} from "./stringify"; import { unreachable } from "./utils"; import { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter"; @@ -90,9 +94,11 @@ export class Tunnel { ) { // TODO: Switch this with runner WS if (!this.#runner.__webSocketReady()) { - this.log?.warn( - "cannot send tunnel message, socket not connected to engine", - ); + this.log?.warn({ + msg: "cannot send tunnel message, socket not connected to engine", + requestId: idToStr(requestId), + message: stringifyToServerTunnelMessageKind(messageKind), + }); return; } @@ -110,7 +116,7 @@ export class Tunnel { msg: "send tunnel msg", requestId: requestIdStr, messageId: messageIdStr, - message: messageKind, + message: stringifyToServerTunnelMessageKind(messageKind), }); // Send message @@ -283,7 +289,7 @@ export class Tunnel { msg: "receive tunnel msg", requestId: requestIdStr, messageId: messageIdStr, - message: message.messageKind, + message: stringifyToClientTunnelMessageKind(message.messageKind), }); if (message.messageKind.tag === "TunnelAck") {