From ddef08fe35cc3fc315201ea1d4c44afddeb0674a Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Tue, 4 Nov 2025 20:01:26 -0800 Subject: [PATCH] fix(next-js): auto shut down runners when source code updates --- .../packages/next-js/src/mod.ts | 172 +++++++++++++++++- .../src/drivers/engine/actor-driver.ts | 6 +- 2 files changed, 173 insertions(+), 5 deletions(-) diff --git a/rivetkit-typescript/packages/next-js/src/mod.ts b/rivetkit-typescript/packages/next-js/src/mod.ts index 097609eafe..cc81b4af3b 100644 --- a/rivetkit-typescript/packages/next-js/src/mod.ts +++ b/rivetkit-typescript/packages/next-js/src/mod.ts @@ -1,4 +1,7 @@ +import { existsSync, statSync } from "node:fs"; +import { join } from "node:path"; import type { Registry, RunConfigInput } from "rivetkit"; +import { stringifyError } from "rivetkit/utils"; import { logger } from "./log"; export const toNextHandler = ( @@ -11,8 +14,8 @@ export const toNextHandler = ( // Configure serverless inputConfig.runnerKind = "serverless"; - // Auto-configure serverless runner if not in prod if (process.env.NODE_ENV !== "production") { + // Auto-configure serverless runner if not in prod logger().debug( "detected development environment, auto-starting engine and auto-configuring serverless", ); @@ -42,17 +45,24 @@ export const toNextHandler = ( const { fetch } = registry.start(inputConfig); + // Function that Next will call when handling requests const fetchWrapper = async ( request: Request, { params }: { params: Promise<{ all: string[] }> }, - ) => { + ): Promise => { const { all } = await params; const newUrl = new URL(request.url); newUrl.pathname = all.join("/"); - const newReq = new Request(newUrl, request); - return await fetch(newReq); + if (process.env.NODE_ENV !== "development") { + // Handle request + const newReq = new Request(newUrl, request); + return await fetch(newReq); + } else { + // Special request handling for file watching + return await handleRequestWithFileWatcher(request, newUrl, fetch); + } }; return { @@ -64,3 +74,157 @@ export const toNextHandler = ( OPTIONS: fetchWrapper, }; }; + +/** + * Special request handler that will watch the source file to terminate this + * request once complete. + * + * See docs on watchRouteFile for more information. + */ +async function handleRequestWithFileWatcher( + request: Request, + newUrl: URL, + fetch: (request: Request, ...args: any) => Response | Promise, +): Promise { + // Create a new abort controller that we can abort, since the signal on + // the request we cannot control + const mergedController = new AbortController(); + const abortMerged = () => mergedController.abort(); + request.signal?.addEventListener("abort", abortMerged); + + // Watch for file changes in dev + // + // We spawn one watcher per-request since there is not a clean way of + // cleaning up global watchers when hot reloading in Next + const watchIntervalId = watchRouteFile(mergedController); + + // Clear interval if request is aborted + request.signal.addEventListener("abort", () => { + logger().debug("clearing file watcher interval: request aborted"); + clearInterval(watchIntervalId); + }); + + // Replace URL and abort signal + const newReq = new Request(newUrl, { + // Copy old request properties + method: request.method, + headers: request.headers, + body: request.body, + credentials: request.credentials, + cache: request.cache, + redirect: request.redirect, + referrer: request.referrer, + integrity: request.integrity, + // Override with new signal + signal: mergedController.signal, + }); + + // Handle request + const response = await fetch(newReq); + + // HACK: Next.js does not provide a way to detect when a request + // finishes, so we need to tap the response stream + // + // We can't just wait for `await fetch` to finish since SSE streams run + // for longer + if (response.body) { + const wrappedStream = waitForStreamFinish(response.body, () => { + logger().debug("clearing file watcher interval: stream finished"); + clearInterval(watchIntervalId); + }); + return new Response(wrappedStream, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }); + } else { + // No response body, clear interval immediately + logger().debug("clearing file watcher interval: no response body"); + clearInterval(watchIntervalId); + return response; + } +} + +/** + * HACK: Watch for file changes on this route in order to shut down the runner. + * We do this because Next.js does not terminate long-running requests on file + * change, so we need to manually shut down the runner in order to trigger a + * new `/start` request with the new code. + * + * We don't use file watchers since those are frequently buggy x-platform and + * subject to misconfigured inotify limits. + */ +function watchRouteFile(abortController: AbortController): NodeJS.Timeout { + logger().debug("starting file watcher"); + + const routePath = join( + process.cwd(), + ".next/server/app/api/rivet/[...all]/route.js", + ); + + let lastMtime: number | null = null; + const checkFile = () => { + logger().debug({ msg: "checking for file changes", routePath }); + try { + if (!existsSync(routePath)) { + return; + } + + const stats = statSync(routePath); + const mtime = stats.mtimeMs; + + if (lastMtime !== null && mtime !== lastMtime) { + logger().info({ msg: "route file changed", routePath }); + abortController.abort(); + } + + lastMtime = mtime; + } catch (err) { + logger().info({ + msg: "failed to check for route file change", + err: stringifyError(err), + }); + } + }; + + checkFile(); + + return setInterval(checkFile, 1000); +} + +/** + * Waits for a stream to finish and calls onFinish on complete. + * + * Used for cancelling the file watcher. + */ +function waitForStreamFinish( + body: ReadableStream, + onFinish: () => void, +): ReadableStream { + const reader = body.getReader(); + return new ReadableStream({ + async start(controller) { + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + logger().debug("stream completed"); + onFinish(); + controller.close(); + break; + } + controller.enqueue(value); + } + } catch (err) { + logger().debug("stream errored"); + onFinish(); + controller.error(err); + } + }, + cancel() { + logger().debug("stream cancelled"); + onFinish(); + reader.cancel(); + }, + }); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index 44ed514a1d..b824ff5e13 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -646,7 +646,11 @@ export class EngineActorDriver implements ActorDriver { stream.onAbort(() => {}); c.req.raw.signal.addEventListener("abort", () => { logger().debug("SSE aborted, shutting down runner"); - this.shutdownRunner(true); + + // We cannot assume that the request will always be closed gracefully by Rivet. We always proceed with a graceful shutdown in case the request was terminated for any other reason. + // + // If we did not use a graceful shutdown, the runner would + this.shutdownRunner(false); }); await this.#runnerStarted.promise;