Skip to content

Commit 7c4e0f1

Browse files
committed
fix(rivetkit): gracefully call _onStop for actors on runner shutdown (#3390)
1 parent 8afd974 commit 7c4e0f1

File tree

1 file changed

+45
-0
lines changed

1 file changed

+45
-0
lines changed

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,51 @@ export class EngineActorDriver implements ActorDriver {
625625
async shutdownRunner(immediate: boolean): Promise<void> {
626626
logger().info({ msg: "stopping engine actor driver", immediate });
627627

628+
// TODO: We need to update the runner to have a draining state so:
629+
// 1. Send ToServerDraining
630+
// - This causes Pegboard to stop allocating actors to this runner
631+
// 2. Pegboard sends ToClientStopActor for all actors on this runner which handles the graceful migration of each actor independently
632+
// 3. Send ToServerStopping once all actors have successfully stopped
633+
//
634+
// What's happening right now is:
635+
// 1. All actors enter stopped state
636+
// 2. Actors still respond to requests because only RivetKit knows it's
637+
// stopping, this causes all requests to issue errors that the actor is
638+
// stopping. (This will NOT return a 503 bc the runner has no idea the
639+
// actors are stopping.)
640+
// 3. Once the last actor stops, then the runner finally stops + actors
641+
// reschedule
642+
//
643+
// This means that:
644+
// - All actors on this runner are bricked until the slowest _onStop finishes
645+
// - Guard will not gracefully handle requests bc it's not receiving a 503
646+
// - Actors can still be scheduled to this runner while the other
647+
// actors are stopping, meaning that those actors will NOT get _onStop
648+
// and will potentiall corrupt their state
649+
//
650+
// HACK: Stop all actors to allow state to be saved
651+
// NOTE: _onStop is only supposed to be called by the runner, we're
652+
// abusing it here
653+
logger().debug({
654+
msg: "stopping all actors before shutdown",
655+
actorCount: this.#actors.size,
656+
});
657+
const stopPromises: Promise<void>[] = [];
658+
for (const [_actorId, handler] of this.#actors.entries()) {
659+
if (handler.actor) {
660+
stopPromises.push(
661+
handler.actor._onStop().catch((err) => {
662+
handler.actor?.rLog.error({
663+
msg: "_onStop errored",
664+
error: stringifyError(err),
665+
});
666+
}),
667+
);
668+
}
669+
}
670+
await Promise.all(stopPromises);
671+
logger().debug({ msg: "all actors stopped" });
672+
628673
// Clear the ack flush interval
629674
if (this.#wsAckFlushInterval) {
630675
clearInterval(this.#wsAckFlushInterval);

0 commit comments

Comments
 (0)