Skip to content

Commit e9b6bde

Browse files
committed
chore(rivetkit): add runner sse ping interval
1 parent a0b95ae commit e9b6bde

File tree

1 file changed

+27
-0
lines changed

1 file changed

+27
-0
lines changed

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ import {
5454
import { KEYS } from "./kv";
5555
import { logger } from "./log";
5656

57+
const RUNNER_SSE_PING_INTERVAL = 1000;
58+
5759
interface ActorHandler {
5860
actor?: AnyActorInstance;
5961
actorStartPromise?: ReturnType<typeof promiseWithResolvers<void>>;
@@ -75,6 +77,7 @@ export class EngineActorDriver implements ActorDriver {
7577

7678
#runnerStarted: PromiseWithResolvers<undefined> = promiseWithResolvers();
7779
#runnerStopped: PromiseWithResolvers<undefined> = promiseWithResolvers();
80+
#isRunnerStopped: boolean = false;
7881

7982
// WebSocket message acknowledgment debouncing
8083
#wsAckQueue: Map<
@@ -150,6 +153,7 @@ export class EngineActorDriver implements ActorDriver {
150153
},
151154
onShutdown: () => {
152155
this.#runnerStopped.resolve(undefined);
156+
this.#isRunnerStopped = true;
153157
},
154158
fetch: this.#runnerFetch.bind(this),
155159
websocket: this.#runnerWebSocket.bind(this),
@@ -652,6 +656,29 @@ export class EngineActorDriver implements ActorDriver {
652656
invariant(payload, "runnerId not set");
653657
await stream.writeSSE({ data: payload });
654658

659+
// Send ping every second to keep the connection alive
660+
while (true) {
661+
if (this.#isRunnerStopped) {
662+
logger().debug({
663+
msg: "runner is stopped",
664+
});
665+
break;
666+
}
667+
668+
if (stream.closed || stream.aborted) {
669+
logger().debug({
670+
msg: "runner sse stream closed",
671+
closed: stream.closed,
672+
aborted: stream.aborted,
673+
});
674+
break;
675+
}
676+
677+
await stream.writeSSE({ event: "ping", data: "" });
678+
await stream.sleep(RUNNER_SSE_PING_INTERVAL);
679+
}
680+
681+
// Wait for the runner to stop if the SSE stream aborted early for any reason
655682
await this.#runnerStopped.promise;
656683
});
657684
}

0 commit comments

Comments
 (0)