From 713729da851206a3d2bf5936b0e509b96411f56d Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 3 Apr 2025 11:53:01 +0100 Subject: [PATCH 1/7] make worker nodetype configurable --- apps/supervisor/src/env.ts | 1 + apps/supervisor/src/workloadManager/kubernetes.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 6c64c0bc44..50c85f98c1 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -50,6 +50,7 @@ const Env = z.object({ // Kubernetes specific settings KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), KUBERNETES_NAMESPACE: z.string().default("default"), + KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"), EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"), EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 2019316d49..f01169eaa3 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -217,7 +217,7 @@ export class KubernetesWorkloadManager implements WorkloadManager { automountServiceAccountToken: false, imagePullSecrets: this.getImagePullSecrets(), nodeSelector: { - nodetype: "worker-re2", + nodetype: env.KUBERNETES_WORKER_NODETYPE_LABEL, }, }; } From 2fef889ef4bd19f149c4812c94c6dd3632b81bf6 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 4 Apr 2025 02:15:09 +0100 Subject: [PATCH 2/7] make max dequeue count configurable from supervisor --- apps/supervisor/src/env.ts | 1 + apps/supervisor/src/index.ts | 1 + apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts | 6 +++++- .../core/src/v3/runEngineWorker/supervisor/queueConsumer.ts | 4 ++++ packages/core/src/v3/runEngineWorker/supervisor/schemas.ts | 1 + packages/core/src/v3/runEngineWorker/supervisor/session.ts | 2 ++ 6 files changed, 14 insertions(+), 1 deletion(-) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 50c85f98c1..d7caccbd80 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -31,6 +31,7 @@ const Env = z.object({ // Dequeue settings (provider mode) TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"), TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000), + TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10), // Optional services TRIGGER_WARM_START_URL: z.string().optional(), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 2f4143e4c4..6bfa8b8862 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -113,6 +113,7 @@ class ManagedSupervisor { managedWorkerSecret: env.MANAGED_WORKER_SECRET, dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS, queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED, + maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT, runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, preDequeue: async () => { if (this.isKubernetes) { diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts b/apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts index 4dd0798ad3..43eea55317 100644 --- a/apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts +++ b/apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts @@ -1,5 +1,8 @@ import { json, TypedResponse } from "@remix-run/server-runtime"; -import { WorkerApiDequeueRequestBody, WorkerApiDequeueResponseBody } from "@trigger.dev/core/v3/workers"; +import { + WorkerApiDequeueRequestBody, + WorkerApiDequeueResponseBody, +} from "@trigger.dev/core/v3/workers"; import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server"; export const action = createActionWorkerApiRoute( @@ -10,6 +13,7 @@ export const action = createActionWorkerApiRoute( return json( await authenticatedWorker.dequeue({ maxResources: body.maxResources, + maxRunCount: body.maxRunCount, }) ); } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts index b46f69c6fa..ed2dfca78f 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts @@ -7,6 +7,7 @@ type RunQueueConsumerOptions = { intervalMs?: number; preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; + maxRunCount?: number; onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; }; @@ -14,6 +15,7 @@ export class RunQueueConsumer { private readonly client: SupervisorHttpClient; private readonly preDequeue?: PreDequeueFn; private readonly preSkip?: PreSkipFn; + private readonly maxRunCount?: number; private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; private intervalMs: number; @@ -24,6 +26,7 @@ export class RunQueueConsumer { this.intervalMs = opts.intervalMs ?? 5_000; this.preDequeue = opts.preDequeue; this.preSkip = opts.preSkip; + this.maxRunCount = opts.maxRunCount; this.onDequeue = opts.onDequeue; this.client = opts.client; } @@ -87,6 +90,7 @@ export class RunQueueConsumer { try { const response = await this.client.dequeue({ maxResources: preDequeueResult?.maxResources, + maxRunCount: this.maxRunCount, }); if (!response.success) { diff --git a/packages/core/src/v3/runEngineWorker/supervisor/schemas.ts b/packages/core/src/v3/runEngineWorker/supervisor/schemas.ts index 64ca509929..abae1e28b3 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/schemas.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/schemas.ts @@ -66,6 +66,7 @@ export type WorkerApiConnectResponseBody = z.infer; diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index 4ce3b312b4..8dd90a3b98 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -17,6 +17,7 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & { dequeueIntervalMs?: number; preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; + maxRunCount?: number; }; export class SupervisorSession extends EventEmitter { @@ -44,6 +45,7 @@ export class SupervisorSession extends EventEmitter { preSkip: opts.preSkip, onDequeue: this.onDequeue.bind(this), intervalMs: opts.dequeueIntervalMs, + maxRunCount: opts.maxRunCount, }); // TODO: This should be dynamic and set by (or at least overridden by) the platform From c260a5ca37b4f5b1d42d758d0c38a0cf0ca623da Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 4 Apr 2025 02:45:09 +0100 Subject: [PATCH 3/7] deterministic runner ids --- apps/supervisor/src/util.ts | 4 ++++ apps/supervisor/src/workloadManager/docker.ts | 6 +++--- apps/supervisor/src/workloadManager/kubernetes.ts | 4 ++-- packages/core/src/v3/isomorphic/friendlyId.ts | 6 ------ 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/apps/supervisor/src/util.ts b/apps/supervisor/src/util.ts index 6ec9cd57d1..6e666cb1b1 100644 --- a/apps/supervisor/src/util.ts +++ b/apps/supervisor/src/util.ts @@ -4,3 +4,7 @@ export function getDockerHostDomain() { return isMacOs || isWindows ? "host.docker.internal" : "localhost"; } + +export function getRunnerId(runId: string) { + return `runner-${runId.replace("run_", "")}`; +} diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index dc658f304e..0fd6416e8d 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -1,5 +1,4 @@ import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; -import { RunnerId } from "@trigger.dev/core/v3/isomorphic"; import { type WorkloadManager, type WorkloadManagerCreateOptions, @@ -7,7 +6,7 @@ import { } from "./types.js"; import { x } from "tinyexec"; import { env } from "../env.js"; -import { getDockerHostDomain } from "../util.js"; +import { getDockerHostDomain, getRunnerId } from "../util.js"; export class DockerWorkloadManager implements WorkloadManager { private readonly logger = new SimpleStructuredLogger("docker-workload-provider"); @@ -23,7 +22,8 @@ export class DockerWorkloadManager implements WorkloadManager { async create(opts: WorkloadManagerCreateOptions) { this.logger.log("[DockerWorkloadProvider] Creating container", { opts }); - const runnerId = RunnerId.generate(); + const runnerId = getRunnerId(opts.runFriendlyId); + const runArgs = [ "run", "--detach", diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index f01169eaa3..685ef42f10 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -4,10 +4,10 @@ import { type WorkloadManagerCreateOptions, type WorkloadManagerOptions, } from "./types.js"; -import { RunnerId } from "@trigger.dev/core/v3/isomorphic"; import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3"; import { env } from "../env.js"; import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js"; +import { getRunnerId } from "../util.js"; type ResourceQuantities = { [K in "cpu" | "memory" | "ephemeral-storage"]?: string; @@ -31,7 +31,7 @@ export class KubernetesWorkloadManager implements WorkloadManager { async create(opts: WorkloadManagerCreateOptions) { this.logger.log("[KubernetesWorkloadManager] Creating container", { opts }); - const runnerId = RunnerId.generate().replace(/_/g, "-"); + const runnerId = getRunnerId(opts.runFriendlyId); try { await this.k8s.core.createNamespacedPod({ diff --git a/packages/core/src/v3/isomorphic/friendlyId.ts b/packages/core/src/v3/isomorphic/friendlyId.ts index 36315f6c44..d87eb4d212 100644 --- a/packages/core/src/v3/isomorphic/friendlyId.ts +++ b/packages/core/src/v3/isomorphic/friendlyId.ts @@ -110,9 +110,3 @@ export class IdGenerator { return `${this.prefix}${customAlphabet(this.alphabet, this.length)()}`; } } - -export const RunnerId = new IdGenerator({ - alphabet: "123456789abcdefghijkmnopqrstuvwxyz", - length: 20, - prefix: "runner_", -}); From 5f45507d8b73446286b39782d524feb7f0278d80 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 4 Apr 2025 03:06:21 +0100 Subject: [PATCH 4/7] update runner id on restore --- .../src/entryPoints/managed-run-controller.ts | 12 ++++++- .../src/v3/runEngineWorker/workload/http.ts | 33 ++++++++++++------- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 59d9923318..4fbce348b7 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -86,6 +86,7 @@ type Metadata = { TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: number | undefined; TRIGGER_SUCCESS_EXIT_CODE: number | undefined; TRIGGER_FAILURE_EXIT_CODE: number | undefined; + TRIGGER_RUNNER_ID: string | undefined; }; class MetadataClient { @@ -126,6 +127,8 @@ class ManagedRunController { private workerApiUrl: string; private workerInstanceName: string; + private runnerId: string; + private successExitCode = env.TRIGGER_SUCCESS_EXIT_CODE; private failureExitCode = env.TRIGGER_FAILURE_EXIT_CODE; @@ -144,6 +147,8 @@ class ManagedRunController { this.workerManifest = opts.workerManifest; + this.runnerId = env.TRIGGER_RUNNER_ID; + this.heartbeatIntervalSeconds = env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; this.snapshotPollIntervalSeconds = env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; @@ -156,7 +161,7 @@ class ManagedRunController { this.httpClient = new WorkloadHttpClient({ workerApiUrl: this.workerApiUrl, - runnerId: env.TRIGGER_RUNNER_ID, + runnerId: this.runnerId, deploymentId: env.TRIGGER_DEPLOYMENT_ID, deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, projectRef: env.TRIGGER_PROJECT_REF, @@ -709,6 +714,11 @@ class ManagedRunController { this.httpClient.updateApiUrl(this.workerApiUrl); } + + if (overrides.TRIGGER_RUNNER_ID) { + this.runnerId = overrides.TRIGGER_RUNNER_ID; + this.httpClient.updateRunnerId(this.runnerId); + } } private async startAndExecuteRunAttempt({ diff --git a/packages/core/src/v3/runEngineWorker/workload/http.ts b/packages/core/src/v3/runEngineWorker/workload/http.ts index 6fb300bdc4..9d97896f09 100644 --- a/packages/core/src/v3/runEngineWorker/workload/http.ts +++ b/packages/core/src/v3/runEngineWorker/workload/http.ts @@ -20,13 +20,13 @@ type WorkloadHttpClientOptions = WorkloadClientCommonOptions; export class WorkloadHttpClient { private apiUrl: string; + private runnerId: string; private readonly deploymentId: string; - private readonly defaultHeaders: Record; - constructor(opts: WorkloadHttpClientOptions) { + constructor(private opts: WorkloadHttpClientOptions) { this.apiUrl = opts.workerApiUrl.replace(/\/$/, ""); - this.defaultHeaders = getDefaultWorkloadHeaders(opts); this.deploymentId = opts.deploymentId; + this.runnerId = opts.runnerId; if (!this.apiUrl) { throw new Error("apiURL is required and needs to be a non-empty string"); @@ -41,6 +41,17 @@ export class WorkloadHttpClient { this.apiUrl = apiUrl.replace(/\/$/, ""); } + updateRunnerId(runnerId: string) { + this.runnerId = runnerId; + } + + defaultHeaders(): Record { + return getDefaultWorkloadHeaders({ + ...this.opts, + runnerId: this.runnerId, + }); + } + async heartbeatRun(runId: string, snapshotId: string, body?: WorkloadHeartbeatRequestBody) { return wrapZodFetch( WorkloadHeartbeatResponseBody, @@ -48,7 +59,7 @@ export class WorkloadHttpClient { { method: "POST", headers: { - ...this.defaultHeaders, + ...this.defaultHeaders(), "Content-Type": "application/json", }, body: JSON.stringify(body ?? {}), @@ -63,7 +74,7 @@ export class WorkloadHttpClient { { method: "GET", headers: { - ...this.defaultHeaders, + ...this.defaultHeaders(), }, } ); @@ -76,7 +87,7 @@ export class WorkloadHttpClient { { method: "GET", headers: { - ...this.defaultHeaders, + ...this.defaultHeaders(), }, } ); @@ -93,7 +104,7 @@ export class WorkloadHttpClient { { method: "POST", headers: { - ...this.defaultHeaders, + ...this.defaultHeaders(), }, body: JSON.stringify(body), } @@ -111,7 +122,7 @@ export class WorkloadHttpClient { { method: "POST", headers: { - ...this.defaultHeaders, + ...this.defaultHeaders(), }, body: JSON.stringify(body), } @@ -125,7 +136,7 @@ export class WorkloadHttpClient { { method: "GET", headers: { - ...this.defaultHeaders, + ...this.defaultHeaders(), }, } ); @@ -139,7 +150,7 @@ export class WorkloadHttpClient { { method: "POST", headers: { - ...this.defaultHeaders, + ...this.defaultHeaders(), "Content-Type": "application/json", }, body: JSON.stringify(body), @@ -161,7 +172,7 @@ export class WorkloadHttpClient { { method: "GET", headers: { - ...this.defaultHeaders, + ...this.defaultHeaders(), }, } ); From b0fab0a78056dbb03aaadf3fcc4041ee192cdefa Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 4 Apr 2025 03:18:31 +0100 Subject: [PATCH 5/7] disable resource monitor --- apps/supervisor/src/index.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 6bfa8b8862..e18dad077b 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -99,7 +99,10 @@ class ManagedSupervisor { this.logger.warn("[ManagedWorker] Failed pod handler disabled"); } - this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), ""); + this.resourceMonitor = new KubernetesResourceMonitor( + createK8sApi(), + env.TRIGGER_WORKER_INSTANCE_NAME + ); this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions); } else { this.resourceMonitor = new DockerResourceMonitor(new Docker()); @@ -117,7 +120,7 @@ class ManagedSupervisor { runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, preDequeue: async () => { if (this.isKubernetes) { - // TODO: Test k8s resource monitor and remove this + // Not used in k8s for now return {}; } @@ -235,10 +238,11 @@ class ManagedSupervisor { snapshotFriendlyId: message.snapshot.friendlyId, }); - this.resourceMonitor.blockResources({ - cpu: message.run.machine.cpu, - memory: message.run.machine.memory, - }); + // Disabled for now + // this.resourceMonitor.blockResources({ + // cpu: message.run.machine.cpu, + // memory: message.run.machine.memory, + // }); } catch (error) { this.logger.error("[ManagedWorker] Failed to create workload", { error }); } From d6ca433e0e271b22e75854caf1e8299bffa91866 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 4 Apr 2025 10:39:01 +0100 Subject: [PATCH 6/7] explicit run controller type imports --- .../src/entryPoints/managed-run-controller.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 4fbce348b7..fc76055212 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -5,24 +5,25 @@ import { z } from "zod"; import { randomUUID } from "crypto"; import { readJSONFile } from "../utilities/fileSystem.js"; import { - CompleteRunAttemptResult, + type CompleteRunAttemptResult, HeartbeatService, - RunExecutionData, - TaskRunExecutionResult, - TaskRunFailedExecutionResult, + type RunExecutionData, + type TaskRunExecutionResult, + type TaskRunFailedExecutionResult, WorkerManifest, } from "@trigger.dev/core/v3"; import { WarmStartClient, WORKLOAD_HEADERS, - WorkloadClientToServerEvents, + type WorkloadClientToServerEvents, + type WorkloadDebugLogRequestBody, WorkloadHttpClient, - WorkloadServerToClientEvents, + type WorkloadServerToClientEvents, type WorkloadRunAttemptStartResponseBody, } from "@trigger.dev/core/v3/workers"; import { assertExhaustive } from "../utilities/assertExhaustive.js"; import { setTimeout as sleep } from "timers/promises"; -import { io, Socket } from "socket.io-client"; +import { io, type Socket } from "socket.io-client"; // All IDs are friendly IDs const Env = z.object({ From 1512c7a3868f9668ff14807b73627ed4081af38a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 4 Apr 2025 10:39:27 +0100 Subject: [PATCH 7/7] enhance debug logs --- .../src/entryPoints/managed-run-controller.ts | 113 +++++++++++++----- 1 file changed, 82 insertions(+), 31 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index fc76055212..cc477c2f12 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -188,8 +188,8 @@ class ManagedRunController { console.debug("[ManagedRunController] Polling for latest snapshot"); - this.httpClient.sendDebugLog(this.runFriendlyId, { - time: new Date(), + this.sendDebugLog({ + runId: this.runFriendlyId, message: `snapshot poll: started`, properties: { snapshotId: this.snapshotFriendlyId, @@ -201,8 +201,8 @@ class ManagedRunController { if (!response.success) { console.error("[ManagedRunController] Snapshot poll failed", { error: response.error }); - this.httpClient.sendDebugLog(this.runFriendlyId, { - time: new Date(), + this.sendDebugLog({ + runId: this.runFriendlyId, message: `snapshot poll: failed`, properties: { snapshotId: this.snapshotFriendlyId, @@ -269,8 +269,8 @@ class ManagedRunController { // This should only be used when we're already executing a run. Attempt number changes are not allowed. private updateRunPhase(run: Run, snapshot: Snapshot) { if (this.state.phase !== "RUN") { - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), + this.sendDebugLog({ + runId: run.friendlyId, message: `updateRunPhase: Invalid phase for updating snapshot: ${this.state.phase}`, properties: { currentPhase: this.state.phase, @@ -282,8 +282,8 @@ class ManagedRunController { } if (this.state.run.friendlyId !== run.friendlyId) { - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), + this.sendDebugLog({ + runId: run.friendlyId, message: `updateRunPhase: Mismatched run IDs`, properties: { currentRunId: this.state.run.friendlyId, @@ -299,8 +299,8 @@ class ManagedRunController { if (this.state.snapshot.friendlyId === snapshot.friendlyId) { logger.debug("updateRunPhase: Snapshot not changed", { run, snapshot }); - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), + this.sendDebugLog({ + runId: run.friendlyId, message: `updateRunPhase: Snapshot not changed`, properties: { snapshotId: snapshot.friendlyId, @@ -311,8 +311,8 @@ class ManagedRunController { } if (this.state.run.attemptNumber !== run.attemptNumber) { - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), + this.sendDebugLog({ + runId: run.friendlyId, message: `updateRunPhase: Attempt number changed`, properties: { oldAttemptNumber: this.state.run.attemptNumber ?? undefined, @@ -418,9 +418,9 @@ class ManagedRunController { snapshotId: this.snapshotFriendlyId, }); - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), - message: `snapshot change: missing snapshot ID`, + this.sendDebugLog({ + runId: run.friendlyId, + message: "snapshot change: missing snapshot ID", properties: { newSnapshotId: snapshot.friendlyId, newSnapshotStatus: snapshot.executionStatus, @@ -433,9 +433,9 @@ class ManagedRunController { if (this.snapshotFriendlyId === snapshot.friendlyId) { console.debug("handleSnapshotChange: snapshot not changed, skipping", { snapshot }); - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), - message: `snapshot change: skipping, no change`, + this.sendDebugLog({ + runId: run.friendlyId, + message: "snapshot change: skipping, no change", properties: { snapshotId: this.snapshotFriendlyId, snapshotStatus: snapshot.executionStatus, @@ -452,8 +452,8 @@ class ManagedRunController { completedWaitpoints: completedWaitpoints.length, }); - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), + this.sendDebugLog({ + runId: run.friendlyId, message: `snapshot change: ${snapshot.executionStatus}`, properties: { oldSnapshotId: this.snapshotFriendlyId, @@ -471,6 +471,15 @@ class ManagedRunController { error, }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "snapshot change: failed to update run phase", + properties: { + currentPhase: this.state.phase, + error: error instanceof Error ? error.message : String(error), + }, + }); + this.waitForNextRun(); return; } @@ -545,8 +554,8 @@ class ManagedRunController { error: suspendResult.error, }); - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), + this.sendDebugLog({ + runId: run.friendlyId, message: "checkpoint: suspend request failed", properties: { snapshotId: snapshot.friendlyId, @@ -562,8 +571,8 @@ class ManagedRunController { suspendResult: suspendResult.data, }); - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), + this.sendDebugLog({ + runId: run.friendlyId, message: "checkpoint: failed to suspend run", properties: { snapshotId: snapshot.friendlyId, @@ -651,9 +660,9 @@ class ManagedRunController { } catch (error) { console.error("handleSnapshotChange: unexpected error", { error }); - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), - message: `snapshot change: unexpected error`, + this.sendDebugLog({ + runId: run.friendlyId, + message: "snapshot change: unexpected error", properties: { snapshotId: snapshot.friendlyId, error: error instanceof Error ? error.message : String(error), @@ -868,8 +877,8 @@ class ManagedRunController { }); if (previousRunId) { - this.httpClient.sendDebugLog(previousRunId, { - time: new Date(), + this.sendDebugLog({ + runId: previousRunId, message: "warm start: received config", properties: { connectionTimeoutMs, @@ -931,8 +940,8 @@ class ManagedRunController { this.socket.on("run:notify", async ({ version, run }) => { console.log("[ManagedRunController] Received run notification", { version, run }); - this.httpClient.sendDebugLog(run.friendlyId, { - time: new Date(), + this.sendDebugLog({ + runId: run.friendlyId, message: "run:notify received by runner", }); @@ -951,6 +960,16 @@ class ManagedRunController { currentRunId: this.runFriendlyId, currentSnapshotId: this.snapshotFriendlyId, }); + + this.sendDebugLog({ + runId: run.friendlyId, + message: "run:notify: ignoring notification for different run", + properties: { + currentRunId: this.runFriendlyId, + currentSnapshotId: this.snapshotFriendlyId, + notificationRunId: run.friendlyId, + }, + }); return; } @@ -961,6 +980,16 @@ class ManagedRunController { if (!latestSnapshot.success) { console.error("Failed to get latest snapshot data", latestSnapshot.error); + + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "run:notify: failed to get latest snapshot data", + properties: { + currentRunId: this.runFriendlyId, + currentSnapshotId: this.snapshotFriendlyId, + error: latestSnapshot.error, + }, + }); return; } @@ -1126,6 +1155,28 @@ class ManagedRunController { assertExhaustive(attemptStatus); } + sendDebugLog({ + runId, + message, + date, + properties, + }: { + runId: string; + message: string; + date?: Date; + properties?: WorkloadDebugLogRequestBody["properties"]; + }) { + this.httpClient.sendDebugLog(runId, { + message, + time: date ?? new Date(), + properties: { + ...properties, + runnerId: this.runnerId, + workerName: this.workerInstanceName, + }, + }); + } + async cancelAttempt(runId: string) { logger.log("cancelling attempt", { runId });