From 1915f319a0f42cd738f0b14058fab080cfc2d4cd Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Tue, 4 Nov 2025 22:49:27 +0000 Subject: [PATCH] chore(rivetkit): add runner sse ping interval --- .../src/drivers/engine/actor-driver.ts | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index c4c9462ce1..393711ff19 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -54,6 +54,8 @@ import { import { KEYS } from "./kv"; import { logger } from "./log"; +const RUNNER_SSE_PING_INTERVAL = 1000; + interface ActorHandler { actor?: AnyActorInstance; actorStartPromise?: ReturnType>; @@ -75,6 +77,7 @@ export class EngineActorDriver implements ActorDriver { #runnerStarted: PromiseWithResolvers = promiseWithResolvers(); #runnerStopped: PromiseWithResolvers = promiseWithResolvers(); + #isRunnerStopped: boolean = false; // WebSocket message acknowledgment debouncing #wsAckQueue: Map< @@ -150,6 +153,7 @@ export class EngineActorDriver implements ActorDriver { }, onShutdown: () => { this.#runnerStopped.resolve(undefined); + this.#isRunnerStopped = true; }, fetch: this.#runnerFetch.bind(this), websocket: this.#runnerWebSocket.bind(this), @@ -652,6 +656,29 @@ export class EngineActorDriver implements ActorDriver { invariant(payload, "runnerId not set"); await stream.writeSSE({ data: payload }); + // Send ping every second to keep the connection alive + while (true) { + if (this.#isRunnerStopped) { + logger().debug({ + msg: "runner is stopped", + }); + break; + } + + if (stream.closed || stream.aborted) { + logger().debug({ + msg: "runner sse stream closed", + closed: stream.closed, + aborted: stream.aborted, + }); + break; + } + + await stream.writeSSE({ event: "ping", data: "" }); + await stream.sleep(RUNNER_SSE_PING_INTERVAL); + } + + // Wait for the runner to stop if the SSE stream aborted early for any reason await this.#runnerStopped.promise; }); }