Skip to content

Commit 3842c8f

Browse files
committed
fix(rivetkit): prevent sleeping if there are any action hono requests, fixes inspector requests not resetiting sleep timer
1 parent fe12a10 commit 3842c8f

File tree

7 files changed

+68
-34
lines changed

7 files changed

+68
-34
lines changed

examples/kitchen-sink/src/backend/actors/demo.ts

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-typescript/packages/rivetkit/src/actor/driver.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,17 @@ export interface ActorDriver {
3535
*/
3636
getDatabase(actorId: string): Promise<unknown | undefined>;
3737

38-
sleep?(actorId: string): Promise<void>;
38+
/**
39+
* Requests the actor to go to sleep.
40+
*
41+
* This will call `_stop` independently.
42+
*/
43+
startSleep?(actorId: string): void;
3944

40-
shutdown?(immediate: boolean): Promise<void>;
45+
/**
46+
* Shuts down the actor runner.
47+
*/
48+
shutdownRunner?(immediate: boolean): Promise<void>;
4149

4250
// Serverless
4351
/** This handles the serverless start request. This should manage the lifecycle of the runner tied to the request lifecycle. */

rivetkit-typescript/packages/rivetkit/src/actor/instance.ts

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned";
1717
import {
1818
arrayBuffersEqual,
1919
bufferToArrayBuffer,
20+
EXTRA_ERROR_LOG,
2021
getEnvUniversal,
2122
promiseWithResolvers,
2223
SinglePromiseQueue,
@@ -194,8 +195,11 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
194195

195196
#sleepTimeout?: NodeJS.Timeout;
196197

197-
// Track active raw requests so sleep logic can account for them
198-
#activeRawFetchCount = 0;
198+
/**
199+
* Track active HTTP requests through Hono router so sleep logic can
200+
* account for them. Does not include WebSockets.
201+
**/
202+
#activeHonoHttpRequests = 0;
199203
#activeRawWebSockets = new Set<UniversalWebSocket>();
200204

201205
#schedule!: Schedule;
@@ -287,7 +291,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
287291
}
288292

289293
get #sleepingSupported(): boolean {
290-
return this.#actorDriver.sleep !== undefined;
294+
return this.#actorDriver.startSleep !== undefined;
291295
}
292296

293297
/**
@@ -1517,10 +1521,6 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
15171521
throw new errors.FetchHandlerNotDefined();
15181522
}
15191523

1520-
// Track active raw fetch while handler runs
1521-
this.#activeRawFetchCount++;
1522-
this.#resetSleepTimer();
1523-
15241524
try {
15251525
const response = await this.#config.onFetch(
15261526
this.actorContext,
@@ -1538,12 +1538,6 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
15381538
});
15391539
throw error;
15401540
} finally {
1541-
// Decrement active raw fetch counter and re-evaluate sleep
1542-
this.#activeRawFetchCount = Math.max(
1543-
0,
1544-
this.#activeRawFetchCount - 1,
1545-
);
1546-
this.#resetSleepTimer();
15471541
this.#savePersistThrottled();
15481542
}
15491543
}
@@ -1880,6 +1874,29 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
18801874
}
18811875
}
18821876

1877+
/**
1878+
* Called by router middleware when an HTTP request begins.
1879+
*/
1880+
__beginHonoHttpRequest() {
1881+
this.#activeHonoHttpRequests++;
1882+
this.#resetSleepTimer();
1883+
}
1884+
1885+
/**
1886+
* Called by router middleware when an HTTP request ends.
1887+
*/
1888+
__endHonoHttpRequest() {
1889+
this.#activeHonoHttpRequests--;
1890+
if (this.#activeHonoHttpRequests < 0) {
1891+
this.#activeHonoHttpRequests = 0;
1892+
this.#rLog.warn({
1893+
msg: "active hono requests went below 0, this is a RivetKit bug",
1894+
...EXTRA_ERROR_LOG,
1895+
});
1896+
}
1897+
this.#resetSleepTimer();
1898+
}
1899+
18831900
// MARK: Sleep
18841901
/**
18851902
* Reset timer from the last actor interaction that allows it to be put to sleep.
@@ -1900,6 +1917,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
19001917
msg: "resetting sleep timer",
19011918
canSleep,
19021919
existingTimeout: !!this.#sleepTimeout,
1920+
timeout: this.#config.options.sleepTimeout,
19031921
});
19041922

19051923
if (this.#sleepTimeout) {
@@ -1912,12 +1930,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
19121930

19131931
if (canSleep) {
19141932
this.#sleepTimeout = setTimeout(() => {
1915-
this._sleep().catch((error) => {
1916-
this.#rLog.error({
1917-
msg: "error during sleep",
1918-
error: stringifyError(error),
1919-
});
1920-
});
1933+
this._sleep();
19211934
}, this.#config.options.sleepTimeout);
19221935
}
19231936
}
@@ -1935,18 +1948,19 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
19351948
if (conn.status === "connected") return false;
19361949
}
19371950

1938-
// Do not sleep if raw fetches are in-flight
1939-
if (this.#activeRawFetchCount > 0) return false;
1951+
// Do not sleep if Hono HTTP requests are in-flight
1952+
if (this.#activeHonoHttpRequests > 0) return false;
19401953

1954+
// TODO: When WS hibernation is ready, update this to only count non-hibernatable websockets
19411955
// Do not sleep if there are raw websockets open
19421956
if (this.#activeRawWebSockets.size > 0) return false;
19431957

19441958
return true;
19451959
}
19461960

19471961
/** Puts an actor to sleep. This should just start the sleep sequence, most shutdown logic should be in _stop (which is called by the ActorDriver when sleeping). */
1948-
async _sleep() {
1949-
const sleep = this.#actorDriver.sleep?.bind(
1962+
_sleep() {
1963+
const sleep = this.#actorDriver.startSleep?.bind(
19501964
this.#actorDriver,
19511965
this.#actorId,
19521966
);
@@ -1962,11 +1976,11 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
19621976
this.#rLog.info({ msg: "actor sleeping" });
19631977

19641978
// Schedule sleep to happen on the next tick. This allows for any action that calls _sleep to complete.
1965-
setImmediate(async () => {
1979+
setImmediate(() => {
19661980
// The actor driver should call stop when ready to stop
19671981
//
19681982
// This will call _stop once Pegboard responds with the new status
1969-
await sleep();
1983+
sleep();
19701984
});
19711985
}
19721986

rivetkit-typescript/packages/rivetkit/src/actor/router.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,17 @@ export function createActorRouter(
7878

7979
router.use("*", loggerMiddleware(loggerWithoutContext()));
8080

81+
// Track all HTTP requests to prevent actor from sleeping during active requests
82+
router.use("*", async (c, next) => {
83+
const actor = await actorDriver.loadActor(c.env.actorId);
84+
actor.__beginHonoHttpRequest();
85+
try {
86+
await next();
87+
} finally {
88+
actor.__endHonoHttpRequest();
89+
}
90+
});
91+
8192
router.get("/", (c) => {
8293
return c.text(
8394
"This is an RivetKit actor.\n\nLearn more at https://rivetkit.org",

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -550,11 +550,11 @@ export class EngineActorDriver implements ActorDriver {
550550
});
551551
}
552552

553-
async sleep(actorId: string) {
553+
startSleep(actorId: string) {
554554
this.#runner.sleepActor(actorId);
555555
}
556556

557-
async shutdown(immediate: boolean): Promise<void> {
557+
async shutdownRunner(immediate: boolean): Promise<void> {
558558
logger().info({ msg: "stopping engine actor driver" });
559559
await this.#runner.shutdown(immediate);
560560
}
@@ -565,7 +565,7 @@ export class EngineActorDriver implements ActorDriver {
565565
stream.onAbort(() => {});
566566
c.req.raw.signal.addEventListener("abort", () => {
567567
logger().debug("SSE aborted, shutting down runner");
568-
this.shutdown(true);
568+
this.shutdownRunner(true);
569569
});
570570

571571
await this.#runnerStarted.promise;

rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ export class FileSystemActorDriver implements ActorDriver {
7979
return this.#state.createDatabase(actorId);
8080
}
8181

82-
sleep(actorId: string): Promise<void> {
83-
return this.#state.sleepActor(actorId);
82+
startSleep(actorId: string): void {
83+
// Spawns the sleepActor promise
84+
this.#state.sleepActor(actorId);
8485
}
8586
}

rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ runDriverTests({
7878
},
7979
driver: driverConfig,
8080
cleanup: async () => {
81-
await actorDriver.shutdown?.(true);
81+
await actorDriver.shutdownRunner?.(true);
8282
},
8383
};
8484
},

0 commit comments

Comments
 (0)