Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/kitchen-sink/src/backend/actors/demo.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions rivetkit-typescript/packages/rivetkit/src/actor/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,17 @@ export interface ActorDriver {
*/
getDatabase(actorId: string): Promise<unknown | undefined>;

sleep?(actorId: string): Promise<void>;
/**
* Requests the actor to go to sleep.
*
* This will call `_stop` independently.
*/
startSleep?(actorId: string): void;

shutdown?(immediate: boolean): Promise<void>;
/**
* Shuts down the actor runner.
*/
shutdownRunner?(immediate: boolean): Promise<void>;

// Serverless
/** This handles the serverless start request. This should manage the lifecycle of the runner tied to the request lifecycle. */
Expand Down
64 changes: 39 additions & 25 deletions rivetkit-typescript/packages/rivetkit/src/actor/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned";
import {
arrayBuffersEqual,
bufferToArrayBuffer,
EXTRA_ERROR_LOG,
getEnvUniversal,
promiseWithResolvers,
SinglePromiseQueue,
Expand Down Expand Up @@ -194,8 +195,11 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {

#sleepTimeout?: NodeJS.Timeout;

// Track active raw requests so sleep logic can account for them
#activeRawFetchCount = 0;
/**
* Track active HTTP requests through Hono router so sleep logic can
* account for them. Does not include WebSockets.
**/
#activeHonoHttpRequests = 0;
#activeRawWebSockets = new Set<UniversalWebSocket>();

#schedule!: Schedule;
Expand Down Expand Up @@ -287,7 +291,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
}

get #sleepingSupported(): boolean {
return this.#actorDriver.sleep !== undefined;
return this.#actorDriver.startSleep !== undefined;
}

/**
Expand Down Expand Up @@ -1517,10 +1521,6 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
throw new errors.FetchHandlerNotDefined();
}

// Track active raw fetch while handler runs
this.#activeRawFetchCount++;
this.#resetSleepTimer();

try {
const response = await this.#config.onFetch(
this.actorContext,
Expand All @@ -1538,12 +1538,6 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
});
throw error;
} finally {
// Decrement active raw fetch counter and re-evaluate sleep
this.#activeRawFetchCount = Math.max(
0,
this.#activeRawFetchCount - 1,
);
this.#resetSleepTimer();
this.#savePersistThrottled();
}
}
Expand Down Expand Up @@ -1880,6 +1874,29 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
}
}

/**
* Called by router middleware when an HTTP request begins.
*/
__beginHonoHttpRequest() {
this.#activeHonoHttpRequests++;
this.#resetSleepTimer();
}

/**
* Called by router middleware when an HTTP request ends.
*/
__endHonoHttpRequest() {
this.#activeHonoHttpRequests--;
if (this.#activeHonoHttpRequests < 0) {
this.#activeHonoHttpRequests = 0;
this.#rLog.warn({
msg: "active hono requests went below 0, this is a RivetKit bug",
...EXTRA_ERROR_LOG,
});
}
this.#resetSleepTimer();
}

// MARK: Sleep
/**
* Reset timer from the last actor interaction that allows it to be put to sleep.
Expand All @@ -1900,6 +1917,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
msg: "resetting sleep timer",
canSleep,
existingTimeout: !!this.#sleepTimeout,
timeout: this.#config.options.sleepTimeout,
});

if (this.#sleepTimeout) {
Expand All @@ -1912,12 +1930,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {

if (canSleep) {
this.#sleepTimeout = setTimeout(() => {
this._sleep().catch((error) => {
this.#rLog.error({
msg: "error during sleep",
error: stringifyError(error),
});
});
this._sleep();
}, this.#config.options.sleepTimeout);
}
}
Expand All @@ -1935,18 +1948,19 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
if (conn.status === "connected") return false;
}

// Do not sleep if raw fetches are in-flight
if (this.#activeRawFetchCount > 0) return false;
// Do not sleep if Hono HTTP requests are in-flight
if (this.#activeHonoHttpRequests > 0) return false;

// TODO: When WS hibernation is ready, update this to only count non-hibernatable websockets
// Do not sleep if there are raw websockets open
if (this.#activeRawWebSockets.size > 0) return false;

return true;
}

/** Puts an actor to sleep. This should just start the sleep sequence, most shutdown logic should be in _stop (which is called by the ActorDriver when sleeping). */
async _sleep() {
const sleep = this.#actorDriver.sleep?.bind(
_sleep() {
const sleep = this.#actorDriver.startSleep?.bind(
this.#actorDriver,
this.#actorId,
);
Expand All @@ -1962,11 +1976,11 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
this.#rLog.info({ msg: "actor sleeping" });

// Schedule sleep to happen on the next tick. This allows for any action that calls _sleep to complete.
setImmediate(async () => {
setImmediate(() => {
// The actor driver should call stop when ready to stop
//
// This will call _stop once Pegboard responds with the new status
await sleep();
sleep();
});
}

Expand Down
11 changes: 11 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/actor/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ export function createActorRouter(

router.use("*", loggerMiddleware(loggerWithoutContext()));

// Track all HTTP requests to prevent actor from sleeping during active requests
router.use("*", async (c, next) => {
const actor = await actorDriver.loadActor(c.env.actorId);
actor.__beginHonoHttpRequest();
try {
await next();
} finally {
actor.__endHonoHttpRequest();
}
});

router.get("/", (c) => {
return c.text(
"This is an RivetKit actor.\n\nLearn more at https://rivetkit.org",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,11 @@ export class EngineActorDriver implements ActorDriver {
});
}

async sleep(actorId: string) {
startSleep(actorId: string) {
this.#runner.sleepActor(actorId);
}

async shutdown(immediate: boolean): Promise<void> {
async shutdownRunner(immediate: boolean): Promise<void> {
logger().info({ msg: "stopping engine actor driver" });
await this.#runner.shutdown(immediate);
}
Expand All @@ -565,7 +565,7 @@ export class EngineActorDriver implements ActorDriver {
stream.onAbort(() => {});
c.req.raw.signal.addEventListener("abort", () => {
logger().debug("SSE aborted, shutting down runner");
this.shutdown(true);
this.shutdownRunner(true);
});

await this.#runnerStarted.promise;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ export class FileSystemActorDriver implements ActorDriver {
return this.#state.createDatabase(actorId);
}

sleep(actorId: string): Promise<void> {
return this.#state.sleepActor(actorId);
startSleep(actorId: string): void {
// Spawns the sleepActor promise
this.#state.sleepActor(actorId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ runDriverTests({
},
driver: driverConfig,
cleanup: async () => {
await actorDriver.shutdown?.(true);
await actorDriver.shutdownRunner?.(true);
},
};
},
Expand Down
Loading