diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index dcdd2f85b7..305665b687 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -136,6 +136,30 @@ export class Runner { // Tunnel for HTTP/WebSocket forwarding #tunnel: Tunnel | undefined; + // Cached child logger with runner-specific attributes + #logCached?: Logger; + + get log(): Logger | undefined { + if (this.#logCached) return this.#logCached; + + const l = logger(); + if (l) { + // If has connected, create child logger with relevant metadata + // + // Otherwise, return default logger + if (this.runnerId) { + this.#logCached = l.child({ + runnerId: this.runnerId, + }); + return this.#logCached; + } else { + return l; + } + } + + return undefined; + } + constructor(config: RunnerConfig) { this.#config = config; if (this.#config.logger) setLogger(this.#config.logger); @@ -190,9 +214,8 @@ export class Runner { } #stopAllActors() { - logger()?.info({ + this.log?.info({ msg: "stopping all actors due to runner lost threshold exceeded", - runnerId: this.runnerId, }); const actorIds = Array.from(this.#actors.keys()); @@ -204,17 +227,15 @@ export class Runner { getActor(actorId: string, generation?: number): ActorInstance | undefined { const actor = this.#actors.get(actorId); if (!actor) { - logger()?.error({ + this.log?.error({ msg: "actor not found", - runnerId: this.runnerId, actorId, }); return undefined; } if (generation !== undefined && actor.generation !== generation) { - logger()?.error({ + this.log?.error({ msg: "actor generation mismatch", - runnerId: this.runnerId, actorId, generation, }); @@ -240,17 +261,15 @@ export class Runner { ): ActorInstance | undefined { const actor = this.#actors.get(actorId); if (!actor) { - logger()?.error({ + this.log?.error({ msg: "actor not found for removal", - runnerId: this.runnerId, actorId, }); return undefined; } if (generation !== undefined && actor.generation !== generation) { - logger()?.error({ + this.log?.error({ msg: "actor generation mismatch", - runnerId: this.runnerId, actorId, generation, }); @@ -267,7 +286,7 @@ export class Runner { if (this.#started) throw new Error("Cannot call runner.start twice"); this.#started = true; - logger()?.info({ msg: "starting runner" }); + this.log?.info({ msg: "starting runner" }); this.#tunnel = new Tunnel(this); this.#tunnel.start(); @@ -282,7 +301,7 @@ export class Runner { if (!this.#config.noAutoShutdown) { if (!SIGNAL_HANDLERS.length) { process.on("SIGTERM", () => { - logger()?.debug("received SIGTERM"); + this.log?.debug("received SIGTERM"); for (const handler of SIGNAL_HANDLERS) { handler(); @@ -291,7 +310,7 @@ export class Runner { process.exit(0); }); process.on("SIGINT", () => { - logger()?.debug("received SIGINT"); + this.log?.debug("received SIGINT"); for (const handler of SIGNAL_HANDLERS) { handler(); @@ -300,7 +319,7 @@ export class Runner { process.exit(0); }); - logger()?.debug({ + this.log?.debug({ msg: "added SIGTERM listeners", }); } @@ -314,9 +333,8 @@ export class Runner { // MARK: Shutdown async shutdown(immediate: boolean, exit: boolean = false) { - logger()?.info({ + this.log?.info({ msg: "starting shutdown", - runnerId: this.runnerId, immediate, exit, }); @@ -372,9 +390,8 @@ export class Runner { } else { // Wait for actors to shut down before stopping try { - logger()?.info({ + this.log?.info({ msg: "sending stopping message", - runnerId: this.runnerId, readyState: pegboardWebSocket.readyState, }); @@ -390,7 +407,7 @@ export class Runner { ) { this.#pegboardWebSocket.send(encoded); } else { - logger()?.error( + this.log?.error( "WebSocket not available or not open for sending data", ); } @@ -400,9 +417,8 @@ export class Runner { throw new Error("missing pegboardWebSocket"); pegboardWebSocket.addEventListener("close", (ev) => { - logger()?.info({ + this.log?.info({ msg: "connection closed", - runnerId: this.runnerId, code: ev.code, reason: ev.reason.toString(), }); @@ -412,22 +428,19 @@ export class Runner { // TODO: Wait for all actors to stop before closing ws - logger()?.info({ + this.log?.info({ msg: "closing WebSocket", - runnerId: this.runnerId, }); pegboardWebSocket.close(1000, "Stopping"); await closePromise; - logger()?.info({ + this.log?.info({ msg: "websocket shutdown completed", - runnerId: this.runnerId, }); } catch (error) { - logger()?.error({ + this.log?.error({ msg: "error during websocket shutdown:", - runnerId: this.runnerId, error, }); pegboardWebSocket.close(); @@ -436,9 +449,8 @@ export class Runner { } else { // This is often logged when the serverless SSE stream closes after // the runner has already shut down - logger()?.debug({ + this.log?.debug({ msg: "no runner WebSocket to shutdown or already closed", - runnerId: this.runnerId, readyState: this.#pegboardWebSocket?.readyState, }); } @@ -480,7 +492,7 @@ export class Runner { const ws = new WS(this.pegboardUrl, protocols) as any as WebSocket; this.#pegboardWebSocket = ws; - logger()?.info({ + this.log?.info({ msg: "connecting", endpoint: this.pegboardEndpoint, namespace: this.#config.namespace, @@ -490,14 +502,14 @@ export class Runner { ws.addEventListener("open", () => { if (this.#reconnectAttempt > 0) { - logger()?.info({ + this.log?.info({ msg: "runner reconnected", namespace: this.#config.namespace, runnerName: this.#config.runnerName, reconnectAttempt: this.#reconnectAttempt, }); } else { - logger()?.debug({ + this.log?.debug({ msg: "runner connected", namespace: this.#config.namespace, runnerName: this.#config.runnerName, @@ -558,9 +570,8 @@ export class Runner { }); } else { clearInterval(pingLoop); - logger()?.info({ + this.log?.info({ msg: "WebSocket not open, stopping ping loop", - runnerId: this.runnerId, }); } }, RUNNER_PING_INTERVAL); @@ -573,9 +584,8 @@ export class Runner { this.#sendCommandAcknowledgment(); } else { clearInterval(ackLoop); - logger()?.info({ + this.log?.info({ msg: "WebSocket not open, stopping ack loop", - runnerId: this.runnerId, }); } }, ackInterval); @@ -611,9 +621,8 @@ export class Runner { ? Number(init.metadata.runnerLostThreshold) : undefined; - logger()?.info({ + this.log?.info({ msg: "received init", - runnerId: init.runnerId, lastEventIdx: init.lastEventIdx, runnerLostThreshold: this.#runnerLostThreshold, }); @@ -641,9 +650,8 @@ export class Runner { }); ws.addEventListener("error", (ev) => { - logger()?.error({ + this.log?.error({ msg: `WebSocket error: ${ev.error}`, - runnerId: this.runnerId, }); if (!this.#shutdown) { @@ -653,9 +661,8 @@ export class Runner { this.#runnerLostThreshold && this.#runnerLostThreshold > 0 ) { - logger()?.info({ + this.log?.info({ msg: "starting runner lost timeout", - runnerId: this.runnerId, seconds: this.#runnerLostThreshold / 1000, }); this.#runnerLostTimeout = setTimeout(() => { @@ -674,16 +681,15 @@ export class Runner { closeError?.group === "ws" && closeError?.error === "eviction" ) { - logger()?.info({ + this.log?.info({ msg: "runner evicted", - runnerId: this.runnerId, }); this.#config.onDisconnected(ev.code, ev.reason); await this.shutdown(true); } else { - logger()?.warn({ + this.log?.warn({ msg: "runner disconnected", namespace: this.#config.namespace, runnerName: this.#config.runnerName, @@ -714,9 +720,8 @@ export class Runner { this.#runnerLostThreshold && this.#runnerLostThreshold > 0 ) { - logger()?.info({ + this.log?.info({ msg: "starting runner lost timeout", - runnerId: this.runnerId, seconds: this.#runnerLostThreshold / 1000, }); this.#runnerLostTimeout = setTimeout(() => { @@ -731,16 +736,14 @@ export class Runner { } #handleCommands(commands: protocol.ToClientCommands) { - logger()?.info({ + this.log?.info({ msg: "received commands", - runnerId: this.runnerId, commandCount: commands.length, }); for (const commandWrapper of commands) { - logger()?.info({ + this.log?.info({ msg: "received command", - runnerId: this.runnerId, commandWrapper, }); if (commandWrapper.inner.tag === "CommandStartActor") { @@ -765,9 +768,8 @@ export class Runner { const prunedCount = originalLength - this.#eventHistory.length; if (prunedCount > 0) { - logger()?.info({ + this.log?.info({ msg: "pruned acknowledged events", - runnerId: this.runnerId, lastAckedIdx: lastAckedIdx.toString(), prunedCount, }); @@ -787,9 +789,8 @@ export class Runner { !this.#eventBacklogWarned ) { this.#eventBacklogWarned = true; - logger()?.warn({ + this.log?.warn({ msg: "unacknowledged event backlog exceeds threshold", - runnerId: this.runnerId, backlogSize: this.#eventHistory.length, threshold: EVENT_BACKLOG_WARN_THRESHOLD, }); @@ -828,9 +829,8 @@ export class Runner { this.#config .onActorStart(actorId, generation, actorConfig) .catch((err) => { - logger()?.error({ + this.log?.error({ msg: "error in onactorstart for actor", - runnerId: this.runnerId, actorId, err, }); @@ -857,9 +857,8 @@ export class Runner { intentType: "sleep" | "stop", ) { if (this.#shutdown) { - logger()?.warn({ + this.log?.warn({ msg: "Runner is shut down, cannot send actor intent", - runnerId: this.runnerId, }); return; } @@ -893,9 +892,8 @@ export class Runner { this.#recordEvent(eventWrapper); - logger()?.info({ + this.log?.info({ msg: "sending event to server", - runnerId: this.runnerId, index: eventWrapper.index, tag: eventWrapper.inner.tag, val: eventWrapper.inner.val, @@ -913,9 +911,8 @@ export class Runner { stateType: "running" | "stopped", ) { if (this.#shutdown) { - logger()?.warn({ + this.log?.warn({ msg: "Runner is shut down, cannot send actor state update", - runnerId: this.runnerId, }); return; } @@ -952,9 +949,8 @@ export class Runner { this.#recordEvent(eventWrapper); - logger()?.info({ + this.log?.info({ msg: "sending event to server", - runnerId: this.runnerId, index: eventWrapper.index, tag: eventWrapper.inner.tag, val: eventWrapper.inner.val, @@ -968,9 +964,8 @@ export class Runner { #sendCommandAcknowledgment() { if (this.#shutdown) { - logger()?.warn({ + this.log?.warn({ msg: "Runner is shut down, cannot send command acknowledgment", - runnerId: this.runnerId, }); return; } @@ -980,7 +975,7 @@ export class Runner { return; } - //logger()?.log("Sending command acknowledgment", this.#lastCommandIdx); + //this.#log?.log("Sending command acknowledgment", this.#lastCommandIdx); this.__sendToServer({ tag: "ToServerAckCommands", @@ -995,8 +990,10 @@ export class Runner { const request = this.#kvRequests.get(requestId); if (!request) { - const msg = "received kv response for unknown request id"; - logger()?.error({ msg, runnerId: this.runnerId, requestId }); + this.log?.error({ + msg: "received kv response for unknown request id", + requestId, + }); return; } @@ -1389,7 +1386,7 @@ export class Runner { } if (processedCount > 0) { - //logger()?.log(`Processed ${processedCount} queued KV requests`); + //this.#log?.log(`Processed ${processedCount} queued KV requests`); } } @@ -1401,9 +1398,8 @@ export class Runner { __sendToServer(message: protocol.ToServer) { if (this.#shutdown) { - logger()?.warn({ + this.log?.warn({ msg: "Runner is shut down, cannot send message to server", - runnerId: this.runnerId, }); return; } @@ -1415,9 +1411,8 @@ export class Runner { ) { this.#pegboardWebSocket.send(encoded); } else { - logger()?.error({ + this.log?.error({ msg: "WebSocket not available or not open for sending data", - runnerId: this.runnerId, }); } } @@ -1446,9 +1441,8 @@ export class Runner { #scheduleReconnect() { if (this.#shutdown) { - logger()?.debug({ + this.log?.debug({ msg: "Runner is shut down, not attempting reconnect", - runnerId: this.runnerId, }); return; } @@ -1460,17 +1454,15 @@ export class Runner { jitter: true, }); - logger()?.debug({ + this.log?.debug({ msg: `Scheduling reconnect attempt ${this.#reconnectAttempt + 1} in ${delay}ms`, - runnerId: this.runnerId, }); this.#reconnectTimeout = setTimeout(async () => { if (!this.#shutdown) { this.#reconnectAttempt++; - logger()?.debug({ + this.log?.debug({ msg: `Attempting to reconnect (attempt ${this.#reconnectAttempt})...`, - runnerId: this.runnerId, }); await this.#openPegboardWebSocket(); } @@ -1484,7 +1476,7 @@ export class Runner { if (eventsToResend.length === 0) return; - //logger()?.log( + //this.#log?.log( // `Resending ${eventsToResend.length} unacknowledged events from index ${Number(lastEventIdx) + 1}`, //); @@ -1515,7 +1507,7 @@ export class Runner { } if (toDelete.length > 0) { - //logger()?.log(`Cleaned up ${toDelete.length} expired KV requests`); + //this.#log?.log(`Cleaned up ${toDelete.length} expired KV requests`); } } } diff --git a/engine/sdks/typescript/runner/src/tunnel.ts b/engine/sdks/typescript/runner/src/tunnel.ts index c99a0d8e72..5b34c36c95 100644 --- a/engine/sdks/typescript/runner/src/tunnel.ts +++ b/engine/sdks/typescript/runner/src/tunnel.ts @@ -1,5 +1,6 @@ import type * as protocol from "@rivetkit/engine-runner-protocol"; import type { MessageId, RequestId } from "@rivetkit/engine-runner-protocol"; +import type { Logger } from "pino"; import { stringify as uuidstringify, v4 as uuidv4 } from "uuid"; import { logger } from "./log"; import type { ActorInstance, Runner } from "./mod"; @@ -35,6 +36,10 @@ export class Tunnel { #gcInterval?: NodeJS.Timeout; + get log(): Logger | undefined { + return this.#runner.log; + } + constructor(runner: Runner) { this.#runner = runner; } @@ -68,7 +73,7 @@ export class Tunnel { ) { // TODO: Switch this with runner WS if (!this.#runner.__webSocketReady()) { - logger()?.warn( + this.log?.warn( "cannot send tunnel message, socket not connected to engine", ); return; @@ -84,7 +89,7 @@ export class Tunnel { requestIdStr, }); - logger()?.debug({ + this.log?.debug({ msg: "send tunnel msg", requestId: requestIdStr, messageId: messageIdStr, @@ -117,7 +122,7 @@ export class Tunnel { }, }; - logger()?.debug({ + this.log?.debug({ msg: "ack tunnel msg", requestId: idToStr(requestId), messageId: idToStr(messageId), @@ -184,7 +189,7 @@ export class Tunnel { // Remove timed out messages if (messagesToDelete.length > 0) { - logger()?.warn({ + this.log?.warn({ msg: "purging unacked tunnel messages, this indicates that the Rivet Engine is disconnected or not responding", count: messagesToDelete.length, }); @@ -225,7 +230,7 @@ export class Tunnel { ): Promise { // Validate actor exists if (!this.#runner.hasActor(actorId)) { - logger()?.warn({ + this.log?.warn({ msg: "ignoring request for unknown actor", actorId, }); @@ -257,7 +262,7 @@ export class Tunnel { async handleTunnelMessage(message: protocol.ToClientTunnelMessage) { const requestIdStr = idToStr(message.requestId); const messageIdStr = idToStr(message.messageId); - logger()?.debug({ + this.log?.debug({ msg: "receive tunnel msg", requestId: requestIdStr, messageId: messageIdStr, @@ -271,7 +276,7 @@ export class Tunnel { const didDelete = this.#pendingTunnelMessages.delete(messageIdStr); if (!didDelete) { - logger()?.warn({ + this.log?.warn({ msg: "received tunnel ack for nonexistent message", requestId: requestIdStr, messageId: messageIdStr, @@ -402,7 +407,7 @@ export class Tunnel { await this.#sendResponse(requestId, response); } } catch (error) { - logger()?.error({ msg: "error handling request", error }); + this.log?.error({ msg: "error handling request", error }); this.#sendResponseError(requestId, 500, "Internal Server Error"); } finally { // Clean up request tracking @@ -497,7 +502,7 @@ export class Tunnel { // Validate actor exists const actor = this.#runner.getActor(open.actorId); if (!actor) { - logger()?.warn({ + this.log?.warn({ msg: "ignoring websocket for unknown actor", actorId: open.actorId, }); @@ -521,7 +526,7 @@ export class Tunnel { const websocketHandler = this.#runner.config.websocket; if (!websocketHandler) { - logger()?.error({ + this.log?.error({ msg: "no websocket handler configured for tunnel", }); // Send close immediately @@ -633,7 +638,7 @@ export class Tunnel { request, ); } catch (error) { - logger()?.error({ msg: "error handling websocket open", error }); + this.log?.error({ msg: "error handling websocket open", error }); // Send close on error this.#sendMessage(requestId, { tag: "ToServerWebSocketClose", @@ -677,7 +682,7 @@ export class Tunnel { } __ackWebsocketMessage(requestId: ArrayBuffer, index: number) { - logger()?.debug({ + this.log?.debug({ msg: "ack ws msg", requestId: idToStr(requestId), index,