From 6f6cbd0680aefd95f8c7202edfbf197162fb8e4b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 12 Feb 2025 16:55:17 +0000 Subject: [PATCH 1/5] remove unused imports --- apps/webapp/app/v3/services/completeAttempt.server.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 13d120aba6..9e64c1a55a 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -8,9 +8,7 @@ import { TaskRunExecutionRetry, TaskRunFailedExecutionResult, TaskRunSuccessfulExecutionResult, - exceptionEventEnhancer, flattenAttributes, - internalErrorFromUnexpectedExit, isManualOutOfMemoryError, sanitizeError, shouldRetryError, @@ -32,7 +30,6 @@ import { CancelAttemptService } from "./cancelAttempt.server"; import { CreateCheckpointService } from "./createCheckpoint.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { RetryAttemptService } from "./retryAttempt.server"; -import { updateMetadataService } from "~/services/metadata/updateMetadata.server"; import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; type FoundAttempt = Awaited>; From 385fd31cc379e29dd2ba9c83b2efb518219e24d5 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 12 Feb 2025 16:56:58 +0000 Subject: [PATCH 2/5] tell run to exit before force requeue --- apps/webapp/app/v3/services/completeAttempt.server.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 9e64c1a55a..a1105717fa 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -31,6 +31,7 @@ import { CreateCheckpointService } from "./createCheckpoint.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { RetryAttemptService } from "./retryAttempt.server"; import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; +import { socketIo } from "../handleSocketIo.server"; type FoundAttempt = Awaited>; @@ -504,6 +505,14 @@ export class CompleteAttemptService extends BaseService { if (forceRequeue) { logger.debug("[CompleteAttemptService] Forcing retry via queue", { runId: run.id }); + + // The run won't know it should shut down as we make the decision to force requeue here + // This also ensures that this change is backwards compatible with older workers + socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", { + version: "v1", + runId: run.id, + }); + await retryViaQueue(); return; } From 83dff56a8d365d3ce5efa7e9dd425f23b7fffc9f Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 12 Feb 2025 17:40:13 +0000 Subject: [PATCH 3/5] handle exit for case where we already retried after oom --- .../app/v3/services/completeAttempt.server.ts | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index a1105717fa..2a595000a9 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -254,9 +254,11 @@ export class CompleteAttemptService extends BaseService { let retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error)); let isOOMRetry = false; + let isOOMAttempt = isOOMError(completion.error); + let isOnMaxOOMMachine = false; - //OOM errors should retry (if an OOM machine is specified) - if (isOOMError(completion.error)) { + //OOM errors should retry (if an OOM machine is specified, and we're not already on it) + if (isOOMAttempt) { const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({ run: { ...taskRunAttempt.taskRun, @@ -266,10 +268,10 @@ export class CompleteAttemptService extends BaseService { execution, }); - if ( - retryConfig?.outOfMemory?.machine && - retryConfig.outOfMemory.machine !== taskRunAttempt.taskRun.machinePreset - ) { + isOnMaxOOMMachine = + retryConfig?.outOfMemory?.machine === taskRunAttempt.taskRun.machinePreset; + + if (retryConfig?.outOfMemory?.machine && !isOnMaxOOMMachine) { //we will retry isOOMRetry = true; retriableError = true; @@ -312,6 +314,11 @@ export class CompleteAttemptService extends BaseService { // The attempt has failed and we won't retry + if (isOOMAttempt && isOnMaxOOMMachine) { + // The attempt failed due to an OOM error but we're already on the machine we should retry on + exitRun(taskRunAttempt.taskRunId); + } + // Now we need to "complete" the task run event/span await eventRepository.completeEvent( getTaskEventStoreTableForRun(taskRunAttempt.taskRun), @@ -508,10 +515,7 @@ export class CompleteAttemptService extends BaseService { // The run won't know it should shut down as we make the decision to force requeue here // This also ensures that this change is backwards compatible with older workers - socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", { - version: "v1", - runId: run.id, - }); + exitRun(run.id); await retryViaQueue(); return; @@ -759,3 +763,10 @@ function isOOMError(error: TaskRunError) { return false; } + +function exitRun(runId: string) { + socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", { + version: "v1", + runId, + }); +} From 3c7ff85e3123a475a3c92cc7d9aa3f3fc8347f7b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 12 Feb 2025 18:05:41 +0000 Subject: [PATCH 4/5] improve retry span and add machine props --- .../app/v3/services/completeAttempt.server.ts | 57 ++++++++++++------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 2a595000a9..fe72d817c5 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -1,5 +1,6 @@ import { Attributes } from "@opentelemetry/api"; import { + MachinePresetName, TaskRunContext, TaskRunError, TaskRunErrorCodes, @@ -256,6 +257,7 @@ export class CompleteAttemptService extends BaseService { let isOOMRetry = false; let isOOMAttempt = isOOMError(completion.error); let isOnMaxOOMMachine = false; + let oomMachine: MachinePresetName | undefined; //OOM errors should retry (if an OOM machine is specified, and we're not already on it) if (isOOMAttempt) { @@ -268,10 +270,10 @@ export class CompleteAttemptService extends BaseService { execution, }); - isOnMaxOOMMachine = - retryConfig?.outOfMemory?.machine === taskRunAttempt.taskRun.machinePreset; + oomMachine = retryConfig?.outOfMemory?.machine; + isOnMaxOOMMachine = oomMachine === taskRunAttempt.taskRun.machinePreset; - if (retryConfig?.outOfMemory?.machine && !isOnMaxOOMMachine) { + if (oomMachine && !isOnMaxOOMMachine) { //we will retry isOOMRetry = true; retriableError = true; @@ -290,7 +292,7 @@ export class CompleteAttemptService extends BaseService { id: taskRunAttempt.taskRunId, }, data: { - machinePreset: retryConfig.outOfMemory.machine, + machinePreset: oomMachine, }, }); } @@ -309,6 +311,7 @@ export class CompleteAttemptService extends BaseService { environment, checkpoint, forceRequeue: isOOMRetry, + oomMachine, }); } @@ -554,6 +557,7 @@ export class CompleteAttemptService extends BaseService { environment, checkpoint, forceRequeue = false, + oomMachine, }: { execution: TaskRunExecution; executionRetry: TaskRunExecutionRetry; @@ -562,29 +566,38 @@ export class CompleteAttemptService extends BaseService { environment: AuthenticatedEnvironment; checkpoint?: CheckpointData; forceRequeue?: boolean; + /** Setting this will also alter the retry span message */ + oomMachine?: MachinePresetName; }) { const retryAt = new Date(executionRetry.timestamp); // Retry the task run - await eventRepository.recordEvent(`Retry #${execution.attempt.number} delay`, { - taskSlug: taskRunAttempt.taskRun.taskIdentifier, - environment, - attributes: { - metadata: this.#generateMetadataAttributesForNextAttempt(execution), - properties: { - retryAt: retryAt.toISOString(), - }, - runId: taskRunAttempt.taskRun.friendlyId, - style: { - icon: "schedule-attempt", + await eventRepository.recordEvent( + `Retry #${execution.attempt.number} delay${oomMachine ? " after OOM" : ""}`, + { + taskSlug: taskRunAttempt.taskRun.taskIdentifier, + environment, + attributes: { + metadata: this.#generateMetadataAttributesForNextAttempt(execution), + properties: { + retryAt: retryAt.toISOString(), + previousMachine: oomMachine + ? taskRunAttempt.taskRun.machinePreset ?? undefined + : undefined, + nextMachine: oomMachine, + }, + runId: taskRunAttempt.taskRun.friendlyId, + style: { + icon: "schedule-attempt", + }, + queueId: taskRunAttempt.queueId, + queueName: taskRunAttempt.taskRun.queue, }, - queueId: taskRunAttempt.queueId, - queueName: taskRunAttempt.taskRun.queue, - }, - context: taskRunAttempt.taskRun.traceContext as Record, - spanIdSeed: `retry-${taskRunAttempt.number + 1}`, - endTime: retryAt, - }); + context: taskRunAttempt.taskRun.traceContext as Record, + spanIdSeed: `retry-${taskRunAttempt.number + 1}`, + endTime: retryAt, + } + ); logger.debug("[CompleteAttemptService] Retrying", { taskRun: taskRunAttempt.taskRun.friendlyId, From 82933a58371fa7610007fb8e1f54ec674af71933 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 12 Feb 2025 18:20:15 +0000 Subject: [PATCH 5/5] don't try to exit run in dev --- apps/webapp/app/v3/services/completeAttempt.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index fe72d817c5..cc4472f1e7 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -317,7 +317,7 @@ export class CompleteAttemptService extends BaseService { // The attempt has failed and we won't retry - if (isOOMAttempt && isOnMaxOOMMachine) { + if (isOOMAttempt && isOnMaxOOMMachine && environment.type !== "DEVELOPMENT") { // The attempt failed due to an OOM error but we're already on the machine we should retry on exitRun(taskRunAttempt.taskRunId); }